Package pylal :: Module fu_Condor
[hide private]
[frames] | no frames]

Source Code for Module pylal.fu_Condor

   1  """ 
   2  This module contains condor jobs / node classes for the followup dag 
   3   
   4  This program creates cache files for the output of inspiral hipe 
   5  """ 
   6   
   7  __author__ = 'Chad Hanna <channa@phys.lsu.edu>' 
   8   
   9  ############################################################################## 
  10  # import standard modules and append the lalapps prefix to the python path 
  11  import sys, os, copy, math 
  12  import math 
  13  import socket, time 
  14  import re, string 
  15  from optparse import * 
  16  import tempfile 
  17  import ConfigParser 
  18  import urlparse 
  19  from UserDict import UserDict 
  20  sys.path.append('@PYTHONLIBDIR@') 
  21   
  22  ############################################################################## 
  23  # import the modules we need to build the pipeline 
  24  from glue import pipeline 
  25  from glue import lal 
  26  from glue import segments 
  27  from glue import segmentsUtils 
  28  from pylal.webUtils import * 
  29  from pylal.webCondor import * 
  30  from lalapps import inspiral 
  31  from pylal import fu_utils 
  32  from glue.ligolw import lsctables 
  33   
  34  ###### WRAPPER FOR CONDOR DAG - TO MAKE THE FOLLOWUP DAG WEBIFIABLE ########### 
  35  ############################################################################### 
  36   
37 -class followUpDAG(pipeline.CondorDAG, webTheDAG):
38
39 - def __init__(self, config_file, log_path):
40 self.basename = re.sub(r'\.ini',r'', config_file) 41 tempfile.tempdir = log_path 42 tempfile.template = self.basename + '.dag.log.' 43 logfile = tempfile.mktemp() 44 fh = open( logfile, "w" ) 45 fh.close() 46 pipeline.CondorDAG.__init__(self,logfile) 47 self.set_dag_file(self.basename) 48 self.jobsDict = {} 49 # The list remote_nodes will contain the list of nodes run remotely 50 # (such as V1 qscans) 51 self.remote_nodes = []
52 53 ######################################################## 54 #### Methods common to several followup classes ######## 55 ######################################################## 56
57 -def checkHipeCachePath(cp):
58 try: 59 if len(string.strip(cp.get('followup-hipe-cache','hipe-cache-path'))) > 0: 60 hipeCachePath = string.strip(cp.get('followup-hipe-cache','hipe-cache-path')) 61 else: 62 hipeCachePath = None 63 return(hipeCachePath) 64 except: 65 print >> sys.stderr, "ERROR: failure in checkHipeCachePath()" 66 return None
67 68 ############################################################################### 69 ### Following two methods are to check CP objects 70 ### If CP object missing section a temporary default valued 71 ### config parser (CP) object is generated 72 ############################################################################### 73
74 -def verifyCP(cp,defaults):
75 """ 76 This method takes in a cp object and give a set of defaults check to 77 make sure the section in question exists and at least the options in 78 the default are specified with some value. It return TRUE or FALSE 79 depending if the cp object contains the sections and options 80 specified by the input DEFAULTS. 81 """ 82 return cp.has_section(defaults["section"]) and \ 83 all(cp.has_option(defaults["section"], opt) for opt in defaults["options"])
84 # End verifyCP 85
86 -def modifyCP(cp,defaults):
87 """ 88 Appended the configuration information in defaults into the config 89 parser (cp) object and return a copy of this newly update cp object. 90 """ 91 if not(cp.has_section(defaults["section"])): 92 cp.add_section(defaults["section"]) 93 for key, val in defaults["options"].iteritems(): 94 if not cp.has_option(defaults["section"], key): 95 cp.set(defaults["section"], val)
96 #End modifyCP 97 98 99 ############################################################################### 100 #### A CLASS TO DO FOLLOWUP INSPIRAL JOBS #################################### 101 ###############################################################################
102 -class followUpInspJob(inspiral.InspiralJob,webTheJob):
103 defaults={ 104 "section":"condor", 105 "options":{ 106 "universe":"vanilla", 107 "inspiral_head":"lalapps_inspiral" 108 } 109 }
110 - def __init__(self,cp,type='plot'):
111 if not(verifyCP(cp,self.defaults)): 112 modifyCP(cp,self.defaults) 113 inspiral.InspiralJob.__init__(self,cp) 114 if type == 'head': 115 self.set_executable(string.strip(cp.get('condor','inspiral_head'))) 116 self.name = 'followUpInspJob' + type 117 self.setupJobWeb(self.name)
118 119
120 -class followUpInspNode(inspiral.InspiralNode,webTheNode):
121
122 - def __init__(self, inspJob, procParams, ifo, trig, cp,opts,dag, datafindCache, d_node, datafindCommand, type='plot', sngl_table = None):
123 self.sample_rate = string.strip(cp.get('coh-inspiral','sample-rate')) 124 if 1:#try: 125 self.output_file_name = "" 126 #inspiral.InspiralNode.__init__(self, inspJob) 127 # the use of this class would require some reorganisation in fu_Condor.py 128 # and webCondor.py in order to set up the jobs following the same scheme 129 # as the way it is done for the Inspiral pipeline... 130 pipeline.CondorDAGNode.__init__(self,inspJob) 131 injFile = self.checkInjections(cp) 132 hipeCache = checkHipeCachePath(cp) 133 134 if type == "plot" or type == "notrig" or type == "coh" or type == "chia": 135 # Here we define the trig-start-time and the trig-end-time; 136 # The difference between these two times should be kept to 2s 137 # Otherwise change the clustering window also 138 hLengthAnalyzed = 1 139 if type == "coh" or type == "chia": hLengthAnalyzed = 1.0 140 self.set_trig_start( int(trig.gpsTime[ifo] - hLengthAnalyzed + 0.5) ) 141 self.set_trig_end( int(trig.gpsTime[ifo] + hLengthAnalyzed + 0.5) ) 142 143 if type == "plot" or type == "notrig" or type == "coh": 144 self.add_var_opt("write-snrsq","") 145 self.add_var_opt("write-chisq","") 146 self.add_var_opt("write-spectrum","") 147 self.add_var_opt("write-template","") 148 if type == "chia" or type == "notrig" or type == "coh": 149 self.add_var_opt("write-cdata","") 150 151 if injFile: 152 self.set_injections( injFile ) 153 154 skipParams = ['minimal-match', 'bank-file', 'user-tag', 'injection-file', 'trig-start-time', 'trig-end-time'] 155 if not hipeCache: 156 skipParams.append('frame-cache') 157 self.add_var_opt('frame-cache',datafindCache) 158 159 # initialize the extension of the output file. If the option 160 # write_compress is found in procParams the extension will be overwritten 161 # later as .xml.gz 162 extension = ".xml" 163 for row in procParams: 164 param = row.param.strip("-") 165 value = row.value 166 # override the options for coherent jobs (useful for skymaps so that 167 # we can bump up the sample rate) 168 if type == "coh" and cp.has_option("coh-inspiral",param): 169 value = cp.get("coh-inspiral",param) 170 if type == "chia" and cp.has_option("coh-inspiral",param): 171 value = cp.get("coh-inspiral",param) 172 if param == 'bank-file': 173 bankFile = value 174 if type == "notrig" or type == "coh" or type == "chia": 175 # if forceTrigger is true, we loose the thresholds to 176 # make sure to get a trigger 177 if param == 'snr-threshold': value = "0.1" 178 # rsq veto must be disabled 179 if param == 'do-rsq-veto': continue 180 if param == 'enable-rsq-veto': continue 181 # chisq veto is disabled by loosing its threshold 182 # we still want to generate the chisq time-series 183 if param == 'chisq-threshold': value = "1.0e+06" 184 # Using a window of 1s for clustering will allow us to always get 185 # at least one trigger 186 if param == 'cluster-method': value = 'window' 187 if param == 'cluster-window': continue 188 pass 189 if param in skipParams: continue 190 self.add_var_opt(param,value) 191 # The attributes _AnalysisNode__end, _AnalysisNode__start, 192 # _InspiralAnalysisNode__pad_data need to be defined before calling the 193 # method "writeAll" of "class webTheDAG". This method calls 194 # "write_sub_files()" in pipeline.py, which itself relies on the 195 # "finalize()" method of "class InspiralAnalysisNode" in inspiral.py . 196 # This is where all these attributes are being used. This hack is 197 # required because "inspiral.InspiralNode.__init__(self, inspJob)" 198 # currently does not work within "class followUpInspNoDE" 199 if param == 'gps-end-time': 200 self.__end = value 201 self._AnalysisNode__end = int(value) 202 if param == 'gps-start-time': 203 self.__start = value 204 self._AnalysisNode__start = int(value) 205 if param == 'pad-data': 206 self._InspiralAnalysisNode__pad_data = int(value) 207 if param == 'ifo-tag': 208 self.__ifotag = value 209 if param == 'channel-name': self.inputIfo = value[0:2] 210 if param == 'write-compress': 211 extension = '.xml.gz' 212 213 if type == "notrig" or type == "coh" or type == "chia": 214 self.add_var_opt('cluster-window',str(hLengthAnalyzed/2.)) 215 self.add_var_opt('disable-rsq-veto',' ') 216 217 # add the arguments that have been specified in the section 218 # [inspiral-extra] of the ini file (intended for 12-18 month analysis) 219 if cp.has_section("followup-inspiral-extra"): 220 for (name,value) in cp.items("followup-inspiral-extra"): 221 self.add_var_opt(name,value) 222 223 if type == "plot" or type == "coh": 224 bankFile = 'trigTemplateBank/' + self.inputIfo + '-TRIGBANK_FOLLOWUP_' + type + str(trig.eventID) + '.xml.gz' 225 if type == "chia": 226 bankFile = 'trigTemplateBank/' + self.inputIfo + '-TRIGBANK_FOLLOWUP_coh' + str(trig.eventID) + '.xml.gz' 227 if type == "notrig": 228 bankFile = 'trigTemplateBank/' + ifo + '-TRIGBANK_FOLLOWUP_' + type + str(trig.eventID) + '.xml.gz' 229 self.set_bank(bankFile) 230 231 if not ifo == self.inputIfo and not type == "coh" and not type == "chia": 232 second_user_tag = "_" + ifo + "tmplt" 233 else: 234 second_user_tag = "" 235 self.set_user_tag("FOLLOWUP_" + str(trig.eventID) + second_user_tag) 236 self.__usertag = "FOLLOWUP_" + str(trig.eventID) + second_user_tag 237 238 239 # THIS IS A HACK FOR NOW, THERE IS PROBABLY A BETTER WAY TO DO THIS 240 if (type == 'head'): 241 subBankSize = string.strip(cp.get('followup-inspiral-head','bank-veto-subbank-size')) 242 if opts.inspiral_head: 243 bankFileName = fu_utils.generateBankVetoBank(trig, ifo, str(trig.gpsTime[ifo]), sngl_table[ifo],int(subBankSize),'BankVetoBank') 244 else: bankFileName = 'none' 245 self.add_var_opt("bank-veto-subbank-size", string.strip(cp.get('followup-inspiral-head','bank-veto-subbank-size'))) 246 self.add_var_opt("order", string.strip(cp.get('followup-inspiral-head','order'))) 247 self.set_bank(bankFileName) 248 249 250 # the output_file_name is required by the child job (plotSNRCHISQNode) 251 if type == "plot" or type == "notrig" or type == "coh" or type == "chia": 252 self.output_file_name = inspJob.outputPath + self.inputIfo + "-INSPIRAL_" + self.__ifotag + "_" + self.__usertag + "-" + self.__start + "-" + str(int(self.__end)-int(self.__start)) + extension 253 254 self.set_id(self.inputIfo + "-INSPIRAL_" + self.__ifotag + "_" + self.__usertag + "-" + self.__start + "-" + str(int(self.__end)-int(self.__start))) 255 256 self.outputCache = self.inputIfo + ' ' + 'INSPIRAL' + ' ' + str(self.__start) + ' ' + str(int(self.__end)-int(self.__start)) + ' ' + self.output_file_name + '\n' + self.inputIfo + ' ' + 'INSPIRAL-FRAME' + ' ' + str(self.__start) + ' ' + str(int(self.__end)-int(self.__start)) + ' ' + self.output_file_name.replace(extension,".gwf") + '\n' 257 258 self.setupNodeWeb(inspJob,False,None,None,None,dag.cache) 259 self.add_var_opt("output-path",inspJob.outputPath) 260 261 if not opts.disable_dag_categories: 262 self.set_category(inspJob.name.lower()) 263 264 try: 265 if d_node.validNode and eval('opts.' + datafindCommand): 266 self.add_parent(d_node) 267 except: 268 print >> sys.stderr, "Didn't find a datafind job, I'll assume I don't need it" 269 270 if type == "plot" or type == "notrig": 271 if opts.inspiral: 272 dag.addNode(self,'inspiral') 273 self.validate() 274 else: self.invalidate() 275 276 if type == 'head': 277 if opts.inspiral_head: 278 dag.addNode(self,'inspiral-head') 279 self.validate() 280 else: self.invalidate() 281 282 if type == 'coh': 283 if opts.coh_inspiral: 284 dag.addNode(self,'coh-inspiral') 285 self.validate() 286 else: self.invalidate() 287 288 if type == "chia": 289 if opts.plot_chia: 290 dag.addNode(self,'chia-inspiral') 291 self.validate() 292 else: self.invalidate() 293 294 else: #except: 295 try: 296 print "couldn't add inspiral job for " + self.inputIfo + "@ "+ str(trig.gpsTime[ifo]) 297 # if self.inputIfo does not exist (happens when inspiral cache and xml files not available), then use ifo in the string. 298 except: 299 print "couldn't add inspiral job for " + ifo + "@ "+ str(trig.gpsTime[ifo])
300
301 - def checkInjections(self,cp):
302 try: 303 if len(string.strip(cp.get('followup-triggers','injection-file'))) > 0: 304 injectionFile = string.strip(cp.get('followup-triggers','injection-file')) 305 else: 306 injectionFile = None 307 return(injectionFile) 308 except: 309 print >> sys.stderr, "ERROR: failure in followUpInspNode.checkInjections()" 310 return None
311 312 ########## PLOT SNR CHISQ TIME SERIES ######################################## 313 ############################################################################### 314 315 ############################################################################## 316 # jobs class for plot snr chisq 317
318 -class plotSNRCHISQJob(pipeline.CondorDAGJob,webTheJob):
319 """ 320 A followup plotting job for snr and chisq time series 321 """ 322 defaults={ 323 "section":"condor", 324 "options":{ 325 "universe":"vanilla", 326 "plotsnrchisq":"plotsnrchisq_pipe" 327 } 328 }
329 - def __init__(self, options, cp, tag_base='PLOT_FOLLOWUP'):
330 """ 331 """ 332 if not(verifyCP(cp,self.defaults)): 333 modifyCP(cp,self.defaults) 334 self.__prog__ = 'plotSNRCHISQJob' 335 self.__executable = string.strip(cp.get('condor','plotsnrchisq')) 336 self.__universe = "vanilla" 337 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 338 self.add_condor_cmd('getenv','True') 339 self.setupJobWeb(self.__prog__,tag_base)
340 341 ############################################################################## 342 # node class for plot snr chisq 343
344 -class plotSNRCHISQNode(pipeline.CondorDAGNode,webTheNode):
345 """ 346 Runs an instance of a plotSNRCHISQ followup job 347 """
348 - def __init__(self,job,ifo,fileName,trig,page,dag,inspiralNode,opts,ifoString=None):
349 """ 350 job = A CondorDAGJob that can run an instance of plotSNRCHISQ followup. 351 """ 352 if ifoString: 353 time = trig.gpsTime[ifoString] 354 else: 355 time = trig.gpsTime[ifo] 356 self.friendlyName = 'Plot SNR/CHISQ/PSD' 357 try: 358 pipeline.CondorDAGNode.__init__(self,job) 359 self.output_file_name = "" 360 self.add_var_opt("frame-file",fileName.replace(".xml",".gwf").strip(".gz")) 361 self.add_var_opt("inspiral-xml-file",fileName) 362 363 duration = 2.0 # width of the time series to be displayed 364 self.add_var_opt("plot-width",duration) 365 366 self.add_var_opt("gps",time) 367 self.add_var_opt("gps-start-time",time-duration*.5) 368 self.add_var_opt("gps-end-time",time+duration*.5) 369 370 self.add_var_opt("ifo-times",ifo) 371 self.add_var_opt("ifo-tag","FOLLOWUP_" + ifo) 372 373 if ifoString: 374 self.add_var_opt("user-tag",ifoString+'tmplt_'+str(trig.eventID)) 375 self.id = job.name + '-' + ifo + '-' + ifoString + 'tmplt' + '-' + str(trig.statValue) + '_' + str(trig.eventID) 376 else: 377 self.add_var_opt("user-tag",str(trig.eventID)) 378 self.id = job.name + '-' + ifo + '-' + str(trig.statValue) + '_' + str(trig.eventID) 379 self.setupNodeWeb(job,True, dag.webPage.lastSection.lastSub,page,None,None) 380 381 if not opts.disable_dag_categories: 382 self.set_category(job.name.lower()) 383 384 if inspiralNode.validNode: self.add_parent(inspiralNode) 385 if opts.plots: 386 dag.addNode(self,self.friendlyName) 387 self.validate() 388 else: self.invalidate() 389 except: 390 self.invalidate() 391 print "couldn't add plot job for " + str(ifo) + "@ "+ str(time)
392 393 ############################################################################## 394 # job class for producing the skymap 395
396 -class lalapps_skyMapJob(pipeline.CondorDAGJob,webTheJob):
397 """ 398 Generates sky map data 399 """ 400 defaults={ 401 "section":"condor", 402 "options":{ 403 "universe":"vanilla", 404 "lalapps_skymap":"lalapps_skymap" 405 } 406 }
407 - def __init__(self, options, cp, tag_base='SKY_MAP'):
408 """ 409 """ 410 if not(verifyCP(cp,self.defaults)): 411 modifyCP(cp,self.defaults) 412 self.__prog__ = 'lalapps_skyMapJob' 413 self.__executable = string.strip(cp.get('condor','lalapps_skymap')) 414 self.__universe = "standard" 415 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 416 self.add_condor_cmd('getenv','True') 417 self.setupJobWeb(self.__prog__,tag_base) 418 self.ra_res = string.strip(cp.get('skymap','ra-res')) 419 self.dec_res = string.strip(cp.get('skymap','dec-res')) 420 self.sample_rate = string.strip(cp.get('coh-inspiral','sample-rate'))
421 422 ############################################################################## 423 # job class for producing the skymap 424
425 -class pylal_skyPlotJob(pipeline.CondorDAGJob,webTheJob):
426 """ 427 Plots the sky map output of lalapps_skymap 428 """ 429 defaults={ 430 "section":"condor", 431 "options":{ 432 "universe":"vanilla", 433 "pylal_skyPlotJob":"pylal_plot_inspiral_skymap" 434 } 435 }
436 - def __init__(self, options, cp, tag_base='SKY_PLOT'):
437 """ 438 """ 439 if not(verifyCP(cp,self.defaults)): 440 modifyCP(cp,self.defaults) 441 self.__prog__ = 'pylal_skyPlotJob' 442 self.__executable = string.strip(cp.get('condor','pylal_skyPlotJob')) 443 self.__universe = "vanilla" 444 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 445 self.add_condor_cmd('getenv','True') 446 self.setupJobWeb(self.__prog__,tag_base) 447 self.ra_res = string.strip(cp.get('skymap','ra-res')) 448 self.dec_res = string.strip(cp.get('skymap','dec-res')) 449 self.sample_rate = string.strip(cp.get('coh-inspiral','sample-rate'))
450 451 452 ############################################################################## 453 # job class for producing the skymap 454
455 -class lalapps_skyMapNode(pipeline.CondorDAGNode,webTheNode):
456 """ 457 A C code for computing the sky map 458 An example command line is: 459 460 lalapps_skymap --h1-frame-file H1-INSPIRAL_SECOND_H1H2L1V1_FOLLOWUP_866088314000001908-866088022-2048.gwf --l1-frame-file L1-INSPIRAL_SECOND_H1H2L1V1_FOLLOWUP_866088314000001908-866088022-2048.gwf --v1-frame-file V1-INSPIRAL_SECOND_H1H2L1V1_FOLLOWUP_866088314000001908-866088205-2048.gwf --event-id 866088314000001908 --ra-res 512 --dec-res 256 --h1-xml-file H1-INSPIRAL_SECOND_H1H2L1V1_FOLLOWUP_866088314000001908-866088022-2048.xml.gz --l1-xml-file L1-INSPIRAL_SECOND_H1H2L1V1_FOLLOWUP_866088314000001908-866088022-2048.xml.gz --v1-xml-file V1-INSPIRAL_SECOND_H1H2L1V1_FOLLOWUP_866088314000001908-866088205-2048.xml.gz --output-file chad.txt 461 """
462 - def __init__(self,job,trig,opts):
463 self.ifo_list = ["H1","L1","V1"] 464 #self.already_added_ifo_list = [] 465 466 self.ra_res = job.ra_res 467 self.dec_res = job.dec_res 468 self.sample_rate = job.sample_rate 469 pipeline.CondorDAGNode.__init__(self,job) 470 self.friendlyName = 'Produce sky map of event' 471 self.id = job.name + '-skymap-' + str(trig.statValue) + '_' + str(trig.eventID) 472 self.setupNodeWeb(job) 473 # required by pylal_skyPlotNode 474 # this program now gzips its files (otherwise they are really huge) 475 self.output_file_name = job.outputPath + self.id+".txt.gz" 476 self.add_var_opt("output-file",self.output_file_name) 477 self.add_var_opt("ra-res",self.ra_res) 478 self.add_var_opt("dec-res",self.dec_res) 479 self.add_var_opt("event-id",trig.eventID) 480 self.add_var_opt("h1-frame-file","none") 481 self.add_var_opt("h1-xml-file","none") 482 self.add_var_opt("h2-frame-file","none") 483 self.add_var_opt("h2-xml-file","none") 484 self.add_var_opt("l1-frame-file","none") 485 self.add_var_opt("l1-xml-file","none") 486 self.add_var_opt("v1-frame-file","none") 487 self.add_var_opt("v1-xml-file","none") 488 self.add_var_opt("sample-rate",self.sample_rate) 489 490 if not opts.disable_dag_categories: 491 self.set_category(job.name.lower())
492
493 - def append_insp_node(self,inspNode,ifo):
494 if ifo in self.ifo_list: 495 fileName = str(inspNode.output_file_name) 496 self.add_var_opt(ifo.lower()+"-frame-file",str(fileName.replace(".xml",".gwf").strip(".gz"))) 497 self.add_var_opt(ifo.lower()+"-xml-file",str(fileName)) 498 if inspNode.validNode: self.add_parent(inspNode) 499 500 else: pass #print >> sys.stderr, "WARNING: Already added " + ifo
501 502
503 - def add_node_to_dag(self,dag,opts,trig):
504 if opts.sky_map: 505 dag.addNode(self,self.friendlyName) 506 self.validate() 507 else: 508 self.invalidate()
509 #print "couldn't add sky map job for " + str(trig.eventID) 510 511 512 513 ############################################################################## 514 # job class for producing the skymap 515
516 -class pylal_skyPlotNode(pipeline.CondorDAGNode,webTheNode):
517 """ 518 A python code for plotting the sky map 519 An example command line is 520 521 /pylal_plot_inspiral_skymap --event-id 866088314000001908 --ra-res 512 --dec-res 256 --output-path . --page-rel-path . --output-web-file test.html --page . --injection-right-ascension 0 --injection-declination 0 --map-data-file chad.txt 522 """
523 - def __init__(self,job,trig,skyMapNode,dag,page,opts):
524 # Always initialize the CondorDAGNode 525 pipeline.CondorDAGNode.__init__(self,job) 526 527 self.friendlyName = 'Produce a plot of the sky map of an event' 528 529 self.id = job.name + '-skymap-plot' + str(trig.statValue) + '_' + str(trig.eventID) 530 # This node outputs pretty pictures, so we need to tell the setupNodeWeb() 531 # method where to put these things. We'll put it in the last section 532 # not the last subsection since this is an "event" plot not a single ifo 533 # trigger plot 534 self.setupNodeWeb(job,True, dag.webPage.lastSection,page,None,None) 535 # this is the output of the skyMapNode. It contains the data to make a 536 # sky map plot. (RA,DEC,Probability) 537 # an example sky map plotting command line is: 538 # 539 540 self.add_var_opt("map-data-file",skyMapNode.output_file_name) 541 self.add_var_opt("user-tag",str(trig.eventID)) 542 self.add_var_opt("ifo-tag",trig.ifos) 543 self.add_var_opt("ifo-times",trig.ifos) 544 self.add_var_opt("ra-res",str(skyMapNode.ra_res)) 545 self.add_var_opt("dec-res",str(skyMapNode.dec_res)) 546 self.add_var_opt("stat-value", str(trig.statValue)) 547 # if this is a software injection pass along the information to the 548 # plotting code so that it can make a mark where the injection should have 549 # been :) 550 if trig.is_found(): 551 inj_ra = trig.coincs.sim.longitude 552 inj_dec = trig.coincs.sim.latitude 553 self.add_var_opt("injection-right-ascension",str(inj_ra)) 554 self.add_var_opt("injection-declination",str(inj_dec)) 555 556 if not opts.disable_dag_categories: 557 self.set_category(job.name.lower()) 558 559 try: 560 if skyMapNode.validNode: self.add_parent(skyMapNode) 561 except: pass 562 if opts.sky_map_plot: 563 dag.addNode(self,self.friendlyName) 564 self.validate() 565 else: self.invalidate()
566 567 568 ############### DATAFIND CLASSES ############################################## 569 ############################################################################### 570
571 -class followupDataFindJob(pipeline.LSCDataFindJob,webTheJob):
572 defaults={ 573 "section":"condor", 574 "options": 575 { 576 "universe":"vanilla", 577 "datafind":"ligo_data_find" 578 } 579 } 580
581 - def __init__(self, config_file, source):
582 583 if source == 'futrig': 584 self.name = 'qscanDataFindJob' 585 if source == 'inspiral': 586 self.name = 'inspiralDataFindJob' 587 588 # unfortunately the logs directory has to be created before we call LSCDataFindJob 589 try: 590 os.mkdir(self.name) 591 os.mkdir(self.name + '/logs') 592 except: pass 593 pipeline.LSCDataFindJob.__init__(self, self.name, self.name + '/logs', config_file) 594 if source == 'futrig': 595 self.setup_cacheconv(config_file) 596 self.setupJobWeb(self.name) # this will overwrite the stdout and stderr set up by LSCDataFindJob
597
598 - def setup_cacheconv(self,cp):
599 # create a shell script to call convertlalcache.pl if the value of $RETURN is 0 600 convert_script = open(self.name + '/cacheconv.sh','w') 601 convert_script.write("""#!/bin/bash 602 if [ ${1} -ne 0 ] ; then 603 exit 1 604 else 605 %s ${2} ${3} 606 fi 607 """ % string.strip(cp.get('condor','convertcache'))) 608 convert_script.close() 609 os.chmod(self.name + '/cacheconv.sh',0755)
610 611
612 -class followupDataFindNode(pipeline.LSCDataFindNode,webTheNode):
613
614 - def __init__(self, job, source, type, cp, time, ifo, opts, dag, datafindCommand, procParams=None):
615 try: 616 self.outputFileName = "" 617 pipeline.LSCDataFindNode.__init__(self,job) 618 self.id = str(ifo) + '-' + repr(time) + '-' + str(type) 619 self.setupNodeWeb(job,False,None,None,None,dag.cache) 620 if source == 'futrig': 621 self.outputFileName = self.setup_fu_trig(job, cp, time, ifo, type) 622 nodeName = "qscan data find" 623 if source == 'inspiral': 624 self.outputFileName = self.setup_inspiral(cp,ifo,type,procParams) 625 nodeName = "inspiral data find" 626 627 if not opts.disable_dag_categories: 628 self.set_category(job.name.lower()) 629 630 # if the selected "ifo" needs to be done remotely (this the case for 631 # Virgo qscan datafind) do not add the node to the dag 632 if eval('opts.' + datafindCommand) and \ 633 not( cp.has_option("followup-"+type,"remote-ifo") and \ 634 cp.get("followup-"+type,"remote-ifo")==ifo ): 635 dag.addNode(self,nodeName) 636 self.validNode = True 637 else: self.validNode = False 638 except: 639 self.validNode = False 640 print >> sys.stderr, "could not set up the datafind jobs for " + type
641
642 - def setup_inspiral(self,cp,ifo,type,procParams):
643 for row in procParams: 644 param = row.param.strip("-") 645 value = row.value 646 if param == 'gps-start-time': startTime = value 647 if param == 'gps-end-time': endTime = value 648 if param == 'pad-data': paddataTime = value 649 self.set_observatory(ifo[0]) 650 self.set_start(int(startTime) - int(paddataTime)) 651 self.set_end(int(endTime) + int(paddataTime)) 652 self.set_type(cp.get("followup-"+type,ifo + '_type')) 653 lalCache = self.get_output() 654 return(lalCache)
655
656 - def setup_fu_trig(self, job, cp, time, ifo, type):
657 # 1s is substracted to the expected startTime to make sure the window 658 # will be large enough. This is to be sure to handle the rouding to the 659 # next sample done by qscan. 660 self.q_time = cp.getint("followup-"+type,'search-time-range')/2 661 self.set_observatory(ifo[0]) 662 self.set_start(int( time - self.q_time - 1)) 663 self.set_end(int( time + self.q_time + 1)) 664 if cp.has_option("followup-"+type, ifo + '_type'): 665 self.set_type( cp.get("followup-"+type, ifo + '_type' )) 666 else: 667 if not( cp.has_option("followup-"+type,"remote-ifo") and \ 668 cp.get("followup-"+type,"remote-ifo")==ifo ): 669 self.set_type( cp.get("followup-"+type, 'type' )) 670 else: self.set_type("dummy") 671 lalCache = self.get_output() 672 qCache = lalCache.rstrip("cache") + "qcache" 673 self.set_post_script(job.name + "/cacheconv.sh $RETURN %s %s" %(lalCache,qCache) ) 674 return(qCache)
675 676 ############################################################################## 677 # qscan class for qscan jobs 678
679 -class qscanJob(pipeline.CondorDAGJob, webTheJob):
680 """ 681 A qscan job 682 """ 683 defaults={ 684 "section":"condor", 685 "options":{ 686 "universe":"vanilla", 687 "qscan":"wpipeline" 688 } 689 } 690
691 - def __init__(self, opts, cp, tag_base='QSCAN'):
692 """ 693 """ 694 if not(verifyCP(cp,self.defaults)): 695 modifyCP(cp,self.defaults) 696 self.__executable = string.strip(cp.get('condor','qscan')) 697 self.__universe = "vanilla" 698 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 699 self.setupJobWeb(tag_base,None,cp) 700 self.setup_checkForDir()
701
702 - def setup_checkForDir(self):
703 # create a shell script to check for the existence of the qscan output directory and rename it if needed 704 checkdir_script = open(self.name + '/checkForDir.sh','w') 705 checkdir_script.write("""#!/bin/bash 706 if [ -d $1/$2 ] 707 then 708 matchingList=$(echo $(find $1 -name $2.bk*)) 709 COUNTER=1 710 for file in $matchingList 711 do 712 let COUNTER=COUNTER+1 713 done 714 mv $1/$2 $1/$2.bk.$COUNTER 715 fi 716 """) 717 checkdir_script.close() 718 os.chmod(self.name + '/checkForDir.sh',0755)
719 720 ############################################################################## 721 # qscan class for qscan Node 722
723 -class qscanNode(pipeline.CondorDAGNode,webTheNode):
724 """ 725 Runs an instance of a qscan job 726 """
727 - def __init__(self,job,time,cp,qcache,ifo,name, opts, d_node, dag, datafindCommand, qscanCommand, trig=None,qFlag=None):
728 """ 729 job = A CondorDAGJob that can run an instance of qscan. 730 """ 731 self.friendlyName = name 732 self.id = ifo + '-' + name + '-' + repr(time) 733 734 pipeline.CondorDAGNode.__init__(self,job) 735 if name.split('-')[0]=='background': 736 self.add_var_arg('scan') 737 else: 738 self.add_var_arg('scan -r') 739 qscanConfig = string.strip(cp.get("followup-"+name, ifo + 'config-file')) 740 self.add_var_arg("-c "+qscanConfig) 741 self.add_var_arg("-f "+qcache) 742 743 if cp.has_option("followup-"+name, ifo + 'output') and string.strip(cp.get("followup-"+name, ifo + 'output')): 744 output = string.strip(cp.get("followup-"+name, ifo + 'output')) 745 else: 746 #output = dag.publish_path + '/' + job.name + '/' + name + '/' + ifo 747 output = job.name + '/' + name + '/' + ifo 748 if not os.access(output,os.F_OK): 749 os.makedirs(output) 750 else: 751 if not os.access(output,os.W_OK): 752 print >> sys.stderr, 'path '+output+' is not writable' 753 sys.exit(1) 754 755 self.add_var_arg("-o "+output+"/"+repr(time)) 756 self.add_var_arg(repr(time)) 757 758 self.set_pre_script(job.name + "/checkForDir.sh %s %s" \ 759 %(output, repr(time))) 760 761 #get the absolute output path whatever the path might be in the ini file 762 #absoutput = os.path.abspath(output) 763 764 #self.outputName = absoutput + '/' + repr(time) # redirect output name 765 self.outputName = output + '/' + repr(time) 766 767 #prepare the string for the output cache 768 self.outputCache = ifo + ' ' + name + ' ' + repr(time) + ' ' + self.outputName + '\n' 769 770 #extract web output from the ini file if the job is QSCAN 771 if job.name == 'QSCAN': 772 if cp.has_option("followup-"+name,ifo+'web') and string.strip(cp.get("followup-"+name,ifo+'web')): 773 pageOverride = string.strip(cp.get("followup-"+name,ifo+'web'))+'/'+repr(time) 774 else: 775 #pageOverride = dag.page + '/' + job.name + '/' + name + '/' + ifo + '/' + repr(time) 776 pageOverride = job.name + '/' + name + '/' + ifo + '/' + repr(time) 777 self.setupNodeWeb(job,False,dag.webPage.lastSection.lastSub,dag.page,pageOverride,dag.cache) 778 779 else: 780 self.setupNodeWeb(job,False,None,None,None,dag.cache) 781 782 # This command will force Condor to see the qscan jobs successful even 783 # they fail. This is useful when the followups are rerun on candidates 784 # already analysed, since when a qscan directory exists, the qscan job 785 # will fail. By setting the post_script to true the qscan job will 786 # still be reported as successful, so that an analyseQscan job can be run 787 # immediately after. 788 # self.set_post_script("/bin/true") 789 790 if not opts.disable_dag_categories: 791 self.set_category(job.name.lower()) 792 793 # only add a parent if it exists 794 try: 795 if d_node.validNode and eval('opts.' + datafindCommand): 796 self.add_parent(d_node) 797 except: pass 798 799 # if the selected "ifo" needs to be done remotely (this the case for 800 # Virgo qscans) do not add the node to the dag 801 if eval('opts.' + qscanCommand): 802 if not(cp.has_option("followup-"+name,"remote-ifo") and \ 803 cp.get("followup-"+name,"remote-ifo")==ifo): 804 dag.addNode(self,self.friendlyName) 805 self.validNode = True 806 else: 807 dag.remote_nodes.append(self) 808 else: self.validNode = False
809 # except: 810 # self.validNode = False 811 # print >> sys.stderr, "could not set up the qscan job for " + self.id 812 813 814 ############################################################################## 815 # class for remote qscan jobs 816
817 -class remoteQscanJob(pipeline.CondorDAGJob, webTheJob):
818 """ 819 A remote qscan job 820 """ 821 defaults={ 822 "section":"condor", 823 "options":{ 824 "universe":"vanilla", 825 "submit_remote_scan":"submit_remote_scan.py" 826 } 827 } 828
829 - def __init__(self, opts, cp, tag_base='REMOTESCAN'):
830 """ 831 """ 832 if not(verifyCP(cp,self.defaults)): 833 modifyCP(cp,self.defaults) 834 835 if not os.path.exists(tag_base): 836 os.mkdir(tag_base) 837 self.setup_executable(tag_base) 838 self.__executable = tag_base + '/remote_scan_wrapper.sh' 839 self.__universe = "scheduler" 840 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 841 self.setupJobWeb(tag_base,None)
842
843 - def setup_executable(self,tag_base):
844 starter_script = open(tag_base + '/remote_scan_wrapper.sh','w') 845 starter_script.write("""#!/bin/bash 846 dotbashrc=$1 847 executable=$2 848 gpstime=$3 849 configfile=$4 850 qscantype=$5 851 remoteoutput=$6 852 remotereceiver=$7 853 outputpath=$8 854 shift 8 855 source $dotbashrc 856 $executable --gps-time $gpstime --config-file $configfile --qscan-type $qscantype --remote-output $remoteoutput --remote-receiver $remotereceiver --output-path $outputpath 857 """) 858 starter_script.close() 859 os.chmod(tag_base + '/remote_scan_wrapper.sh',0755)
860 861 862 ############################################################################## 863 # class for remote qscan Node 864
865 -class remoteQscanFgNode(pipeline.CondorDAGNode,webTheNode):
866 """ 867 Runs an instance of a remote qscan job 868 """
869 - def __init__(self,job,time,cp,ifo,name,opts,dag,qscanCommand):
870 """ 871 job = A CondorDAGJob that can run an instance of remote qscan. 872 """ 873 self.friendlyName = name 874 self.id = ifo + '-' + name + '-' + repr(time) 875 pipeline.CondorDAGNode.__init__(self,job) 876 self.add_macro("macroid", self.id) 877 self.jobName = job.name 878 879 self.add_var_arg(string.strip(cp.get("followup-remote-scan","virgo-env-path"))) 880 self.add_var_arg(string.strip(cp.get("condor","submit_remote_scan"))) 881 self.add_var_arg(repr(time)) 882 self.add_var_arg(string.strip(cp.get("followup-"+name,ifo+"config-file"))) 883 self.add_var_arg("_".join(name.split("-")[1:len(name.split("-"))])) 884 self.add_var_arg(string.strip(cp.get("followup-remote-scan","remote-output"))) 885 self.add_var_arg(string.strip(cp.get("followup-remote-scan","remote-server"))) 886 887 outputdir = 'QSCAN' + '/' + name + '/' + ifo + '/' + repr(time) 888 self.add_var_arg(outputdir) 889 890 if not opts.disable_dag_categories: 891 self.set_category(job.name.lower()) 892 893 if eval('opts.' + qscanCommand): 894 dag.addNode(self,"Remote " + self.friendlyName) 895 self.validate() 896 else: self.invalidate()
897 898 899 ############################################################################## 900 # distributeQscanJob class: the job 901
902 -class distributeQscanJob(pipeline.CondorDAGJob, webTheJob):
903 """ 904 A job to distribute the results of the qscans that have been run remotely (for LV search) 905 """ 906 defaults={ 907 "section":"condor", 908 "options":{ 909 "universe":"vanilla", 910 "distribute_q":"distrib_fu_qscan_results.py" 911 } 912 } 913
914 - def __init__(self,cp):
915 """ 916 """ 917 if not(verifyCP(cp,self.defaults)): 918 modifyCP(cp,self.defaults) 919 self.__prog__ = 'distributeQscanJob' 920 self.__executable = string.strip(cp.get('condor','distribute_q')) 921 self.__universe = "vanilla" 922 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 923 self.add_condor_cmd('getenv','True') 924 self.setupJobWeb(self.__prog__)
925 926 ############################################################################## 927 # distributeQscanNode class: the node 928
929 -class distributeQscanNode(pipeline.CondorDAGNode, webTheNode):
930 """ 931 A node to distribute the results of the qscans that have been run remotely (for LV search) 932 """
933 - def __init__(self,job,foregroundCache,backgroundCache,ifo,inputFile,opts,dag):
934 935 self.friendlyName = "distributeQscanResults" 936 937 pipeline.CondorDAGNode.__init__(self,job) 938 self.add_var_opt('qscan-input-file',inputFile) 939 self.add_var_opt('qscan-cache-background',backgroundCache) 940 self.add_var_opt('qscan-cache-foreground',foregroundCache) 941 self.add_var_opt('remote-ifo',ifo) 942 943 typeList="" 944 for type in ["qscan","seismic-qscan"]: 945 typeList += type + "," 946 self.add_var_opt('qscan-type-list',typeList.strip(',')) 947 948 if opts.distrib_remote_q: 949 dag.addNode(self,self.friendlyName) 950 self.validNode = True 951 else: self.validNode = False
952 953 ############################################################################## 954 # analyse qscan class: the job 955
956 -class analyseQscanJob(pipeline.CondorDAGJob, webTheJob):
957 """ 958 A followup analyseQscan job to interprete the qscans 959 """ 960 defaults={ 961 "section":"condor", 962 "options":{ 963 "universe":"vanilla", 964 "analyseQscan":"analyseQscan.py" 965 } 966 }
967 - def __init__(self,options,cp,tag_base='ANALYSE_QSCAN'):
968 if not(verifyCP(cp,self.defaults)): 969 modifyCP(cp,self.defaults) 970 self.__prog__ = 'analyseQscanJob' 971 self.__executable = string.strip(cp.get('condor','analyseQscan')) 972 self.__universe = "vanilla" 973 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 974 self.add_condor_cmd('getenv','True') 975 self.setupJobWeb(self.__prog__,tag_base)
976 977 ############################################################################## 978 # analyse qscan class: the node 979
980 -class analyseQscanNode(pipeline.CondorDAGNode,webTheNode):
981 """ 982 Runs an instance of a followup analyseQscan job 983 """
984 - def __init__(self,job,time,ifo,name,foregroundCache,backgroundCache,cp,opts,dag,command):
985 """ 986 job = A CondorDAGJob that can run an instance of analyseQscan followup. 987 """ 988 self.friendlyName = 'analyse ' + name 989 self.id = ifo + '-' + name + '-' + repr(time) 990 991 nameList = name.split('-')[1:len(name.split('-'))] 992 shortName = '' 993 for word in nameList: 994 shortName = shortName + word + '-' 995 996 try: 997 pipeline.CondorDAGNode.__init__(self,job) 998 if cp.has_option('followup-analyse-qscan','generate-qscan-xml'): 999 self.add_var_opt('generate-qscan-xml','') 1000 self.add_var_opt('z-threshold',cp.getfloat('followup-analyse-qscan','z-threshold')) 1001 if cp.has_option('followup-analyse-qscan','plot-z-distribution'): 1002 self.add_var_opt('plot-z-distribution','') 1003 self.add_var_opt('z-min',cp.getfloat('followup-analyse-qscan','z-min')) 1004 self.add_var_opt('z-max',cp.getfloat('followup-analyse-qscan','z-max')) 1005 self.add_var_opt('z-bins',cp.getfloat('followup-analyse-qscan','z-bins')) 1006 if cp.has_option('followup-analyse-qscan','plot-dt-distribution'): 1007 self.add_var_opt('plot-dt-distribution','') 1008 self.add_var_opt('dt-min',cp.getfloat('followup-analyse-qscan',shortName + 'dt-min')) 1009 self.add_var_opt('dt-max',cp.getfloat('followup-analyse-qscan',shortName + 'dt-max')) 1010 self.add_var_opt('dt-bins',cp.getfloat('followup-analyse-qscan','dt-bins')) 1011 if cp.has_option('followup-analyse-qscan','plot-z-scattered'): 1012 self.add_var_opt('plot-z-scattered','') 1013 if cp.has_option('followup-analyse-qscan','plot-z-scattered') or cp.has_option('followup-analyse-qscan','plot-dt-distribution'): 1014 if not ifo=='V1': 1015 refChannel = cp.get('followup-analyse-qscan',shortName + 'ref-channel').split(',')[0].strip() 1016 else: 1017 refChannel = cp.get('followup-analyse-qscan',shortName + 'ref-channel').split(',')[1].strip() 1018 self.add_var_opt('ref-channel',refChannel) 1019 self.add_var_opt('ifo-times',ifo) 1020 self.add_var_opt('type',name) 1021 self.add_var_opt('gps-string',repr(time)) 1022 self.add_var_opt('ifo-tag',ifo) 1023 self.add_var_opt('user-tag',repr(time).replace('.','_') + "_" + shortName.replace('-','_').strip("_")) 1024 1025 self.add_var_opt('qscan-cache-foreground',foregroundCache) 1026 self.add_var_opt('qscan-cache-background',backgroundCache) 1027 1028 self.setupNodeWeb(job,True,None,dag.page,None,None) 1029 # get the table of the qscan job associated to this trigger 1030 #if not(cp.has_option("followup-"+name,"remote-ifo") and cp.get("followup-"+name,"remote-ifo")==ifo): 1031 #for node in dag.get_nodes(): 1032 #if isinstance(node,qscanNode): 1033 #if node.id == self.id: 1034 # link the analyseQscan output page to the qscan table 1035 #node.webTable.row[0].cell[0].linebreak() 1036 #node.webTable.row[0].cell[0].link(self.webLink,"qscan background vs qscan foreground") 1037 #break 1038 # if remote-ifo is analysed, find the associated qscan jobs in dag.remote_nodes 1039 #else: 1040 #for node in dag.remote_nodes: 1041 #if isinstance(node,qscanNode): 1042 #if node.id == self.id: 1043 #node.webTable.row[0].cell[0].linebreak() 1044 #node.webTable.row[0].cell[0].link(self.webLink,"qscan background vs qscan foreground") 1045 #break 1046 1047 if not opts.disable_dag_categories: 1048 self.set_category(job.name.lower()) 1049 1050 # add the parents to this node 1051 for node in dag.get_nodes(): 1052 # if node distributeQscanNode is valid and if remote ifo is analysed, 1053 # add distributeQscanNode as parent 1054 if isinstance(node,distributeQscanNode): 1055 if cp.has_option("followup-"+name,"remote-ifo") and cp.get("followup-"+name,"remote-ifo")==ifo: 1056 if node.validNode: 1057 self.add_parent(node) 1058 # if node remoteQscanFgNode is valid and if remote ifo is analysed, 1059 # add remoteQscanFgNode as parent 1060 if isinstance(node,remoteQscanFgNode): 1061 if cp.has_option("followup-"+name,"remote-ifo") and cp.get("followup-"+name,"remote-ifo")==ifo: 1062 if node.friendlyName == name and node.validNode: 1063 self.add_parent(node) 1064 # add all qscan nodes of the same type as parents 1065 if isinstance(node,qscanNode): 1066 if node.validNode: 1067 if (node.friendlyName == name or \ 1068 node.friendlyName.replace('background','foreground') == name) \ 1069 and node.id.split('-')[0] == ifo: 1070 self.add_parent(node) 1071 1072 if eval('opts.' + command): 1073 dag.addNode(self,self.friendlyName) 1074 self.validNode = True 1075 else: self.validNode = False 1076 1077 except: 1078 self.validNode = False 1079 print "couldn't add " + name + " analyseQscan job for " + ifo + "@ "+ repr(time)
1080 1081 ############################################################################## 1082 # class for h1h2 qevent jobs 1083
1084 -class h1h2QeventJob(pipeline.CondorDAGJob, webTheJob):
1085 """ 1086 A h1h2 qevent job 1087 """ 1088 defaults={ 1089 "section":"condor", 1090 "options":{ 1091 "universe":"vanilla", 1092 "qscan":"wpipeline" 1093 } 1094 }
1095 - def __init__(self, opts, cp):
1096 """ 1097 """ 1098 if not(verifyCP(cp,self.defaults)): 1099 modifyCP(cp,self.defaults) 1100 self.name = 'h1h2QeventJob' 1101 self.__executable = string.strip(cp.get('condor','qscan')) 1102 self.__universe = "vanilla" 1103 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 1104 self.setupJobWeb(self.name) 1105 #self.setup_cachecat() 1106 self.setup_cacheAndCheckDir()
1107 1108 # def setup_cachecat(self): 1109 # # create a shell script to cat all the required cache files 1110 # cat_script = open(self.name + '/cachecat.sh','w') 1111 # cat_script.write("""#!/bin/bash 1112 # cat ${1} ${2} > ${3} 1113 # """) 1114 # cat_script.close() 1115 # os.chmod(self.name + '/cachecat.sh',0755) 1116
1117 - def setup_cacheAndCheckDir(self):
1118 # create a shell script to cat all the required cache files 1119 # create a shell script to check for the existence of the qscan output directory and rename it if needed 1120 checkdir_script = open(self.name + '/checkForDir.sh','w') 1121 checkdir_script.write("""#!/bin/bash 1122 cat ${1} ${2} > ${3} 1123 if [ -d $4/$5 ] 1124 then 1125 matchingList=$(echo $(find $4 -name $5.bk*)) 1126 COUNTER=1 1127 for file in $matchingList 1128 do 1129 let COUNTER=COUNTER+1 1130 done 1131 mv $4/$5 $4/$5.bk.$COUNTER 1132 fi 1133 """) 1134 checkdir_script.close() 1135 os.chmod(self.name + '/checkForDir.sh',0755)
1136 1137 ############################################################################# 1138 # class for h1h2 qevent Node 1139
1140 -class h1h2QeventNode(pipeline.CondorDAGNode,webTheNode):
1141 """ 1142 Runs an instance of a qscan job 1143 """
1144 - def __init__(self,job,dNode,times,ifoList,name,cp,opts,dag,qeventCommand):
1145 """ 1146 job = A CondorDAGJob that can run an instance of H1H2 qevent. 1147 """ 1148 1149 ifoString = '' 1150 for ifo in ifoList: 1151 ifoString = ifoString + ifo 1152 1153 self.friendlyName = name 1154 self.id = ifoString + '-' + name + '-' + str(times[ifoList[0]]) 1155 1156 pipeline.CondorDAGNode.__init__(self,job) 1157 1158 cache_type_temp = dNode[ifoList[0]].outputFileName.split('-')[1] 1159 cache_type = cache_type_temp[3:len(cache_type_temp)] 1160 cache_start = [] 1161 cache_end = [] 1162 for ifo in ifoList: 1163 cache_temp = dNode[ifo].outputFileName.split('.')[0] 1164 cache_start.append(cache_temp.split('-')[2]) 1165 cache_end.append(cache_temp.split('-')[-1]) 1166 cache_start_time = max(cache_start) 1167 1168 qeventcache = job.name + '/' + ifoString + '_' + cache_type + '-' + \ 1169 str(max(cache_start)) + '-' + str(min(cache_end)) + '.qcache' 1170 1171 1172 if cp.has_option("followup-"+name, ifoString + '-output') and string.strip(cp.get("followup-"+name, ifoString + '-output')): 1173 output = string.strip(cp.get("followup-"+name, ifoString + '-output')) 1174 else: 1175 #output = dag.publish_path + '/' + job.name + '/' + name + '/' + ifoString 1176 output = job.name + '/' + name + '/' + ifoString 1177 if not os.access(output,os.F_OK): 1178 os.makedirs(output) 1179 else: 1180 if not os.access(output,os.W_OK): 1181 print >> sys.stderr, 'path '+output+' is not writable' 1182 sys.exit(1) 1183 1184 self.add_var_arg('event') 1185 qeventConfig = string.strip(cp.get("followup-"+name, ifoString + '-config-file')) 1186 self.add_var_arg('-p '+qeventConfig) 1187 self.add_file_arg('-f '+qeventcache) 1188 self.add_var_arg('-o '+output+'/'+repr(times[ifoList[0]])) 1189 self.add_var_arg(repr(times[ifoList[0]])) 1190 eventDuration = string.strip(cp.get("followup-"+name, 'duration')) 1191 self.add_var_arg(eventDuration) 1192 1193 #self.set_pre_script(job.name + "/cachecat.sh %s %s %s" \ 1194 #%(dNode[ifoList[0]].outputFileName, dNode[ifoList[1]].outputFileName, \ 1195 #qeventcache)) 1196 self.set_pre_script(job.name + "/checkForDir.sh %s %s %s %s %s" \ 1197 %(dNode[ifoList[0]].outputFileName, dNode[ifoList[1]].outputFileName, \ 1198 qeventcache, output, repr(times[ifoList[0]]))) 1199 1200 #get the absolute output path whatever the path might be in the ini file 1201 absoutput = os.path.abspath(output) 1202 self.outputName = absoutput + '/' + repr(times[ifoList[0]]) # redirect output name 1203 1204 #prepare the string for the output cache 1205 self.outputCache = ifoString + ' ' + name + ' ' + repr(times[ifoList[0]]) + ' ' + self.outputName + '\n' 1206 1207 if cp.has_option("followup-"+name,ifoString+'-web') and string.strip(cp.get("followup-"+name,ifoString+'-web')): 1208 pageOverride = string.strip(cp.get("followup-"+name,ifoString+'-web'))+'/'+repr(times[ifoList[0]]) 1209 else: 1210 #pageOverride = dag.page + '/' + job.name + '/' + name + '/' + ifoString + '/' + repr(times[ifoList[0]]) 1211 pageOverride = job.name + '/' + name + '/' + ifoString + '/' + repr(times[ifoList[0]]) 1212 self.setupNodeWeb(job,False,dag.webPage.lastSection.lastSub,dag.page,pageOverride,dag.cache) 1213 1214 if not opts.disable_dag_categories: 1215 self.set_category(job.name.lower()) 1216 1217 for ifo in ifoList: 1218 if dNode[ifo].validNode: self.add_parent(dNode[ifo]) 1219 else: pass 1220 1221 if eval('opts.' + qeventCommand): 1222 dag.addNode(self,self.friendlyName) 1223 self.validNode = True 1224 else: self.validNode = False
1225 1226 1227 ############################################################################### 1228 # FrCheck Jobs and Nodes 1229
1230 -class FrCheckJob(pipeline.CondorDAGJob, webTheJob):
1231 """ 1232 A followup job for checking frames 1233 """ 1234 defaults={ 1235 "section":"condor", 1236 "options":{ 1237 "universe":"vanilla", 1238 "frame_check":"frame_check" 1239 } 1240 }
1241 - def __init__(self, options, cp, tag_base='FRCHECK'):
1242 """ 1243 """ 1244 if not(verifyCP(cp,self.defaults)): 1245 modifyCP(cp,self.defaults) 1246 self.__prog__ = 'FrCheckJob' 1247 self.__executable = string.strip(cp.get('condor','frame_check')) 1248 self.__universe = "vanilla" 1249 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 1250 self.add_condor_cmd('getenv','True') 1251 self.setupJobWeb(self.__prog__,tag_base)
1252 1253
1254 -class FrCheckNode(pipeline.CondorDAGNode,webTheNode):
1255 """ 1256 Runs an instance of a FrCheck followup job 1257 """
1258 - def __init__(self, FrCheckJob, procParams, ifo, trig, cp, opts, dag, datafindCache, d_node, datafindCommand):
1259 1260 try: 1261 hipeCache = checkHipeCachePath(cp) 1262 1263 if not hipeCache: 1264 cacheFile = datafindCache 1265 else: 1266 for row in procParams: 1267 param = row.param.strip("-") 1268 value = row.value 1269 if param == 'frame-cache': cacheFile = value 1270 1271 self.friendlyName = 'Frame Check' 1272 1273 pipeline.CondorDAGNode.__init__(self,FrCheckJob) 1274 self.add_var_opt("frame-cache", cacheFile) 1275 self.add_var_opt("frame-check-executable", string.strip(cp.get('followup-frameCheck','executable'))) 1276 self.add_var_opt("ifo-times",ifo) 1277 self.add_var_opt("ifo-tag","FOLLOWUP_"+ifo) 1278 self.add_var_opt("user-tag",trig.eventID) 1279 self.id = FrCheckJob.name + '-' + ifo + '-' + str(trig.statValue) + '_' + str(trig.eventID) 1280 self.setupNodeWeb(FrCheckJob,True, dag.webPage.lastSection.lastSub,dag.page,None,None) 1281 1282 if not opts.disable_dag_categories: 1283 self.set_category(FrCheckJob.name.lower()) 1284 1285 try: 1286 if d_node.validNode and eval('opts.' + datafindCommand): 1287 self.add_parent(d_node) 1288 except: pass 1289 1290 if opts.frame_check: 1291 dag.addNode(self,self.friendlyName) 1292 self.validate() 1293 else: self.invalidate() 1294 1295 except: 1296 self.invalidate() 1297 print "couldn't add frame check job for " + str(ifo) + "@ "+ str(trig.gpsTime[ifo])
1298
1299 -class IFOstatus_checkJob(pipeline.CondorDAGJob, webTheJob):
1300 """ 1301 A followup job for downloading summary plots 1302 """ 1303 defaults={ 1304 "section":"condor", 1305 "options":{ 1306 "universe":"vanilla", 1307 "IFOstatus_check":"IFOstatus_check" 1308 } 1309 }
1310 - def __init__(self, options, cp, tag_base='IFOSTATUS'):
1311 if not(verifyCP(cp,self.defaults)): 1312 modifyCP(cp,self.defaults) 1313 self.__prog__ = 'IFOstatus_checkJob' 1314 self.__executable = string.strip(cp.get('condor','IFOstatus_check')) 1315 self.__universe = "local" 1316 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 1317 self.add_condor_cmd('getenv','True') 1318 self.setupJobWeb(self.__prog__,tag_base)
1319
1320 -class IFOstatus_checkNode(pipeline.CondorDAGNode,webTheNode):
1321 """ 1322 Runs an instance of a FrCheck followup job 1323 """
1324 - def __init__(self, IFOstatus_checkJob, ifo, trig, cp,opts,dag):
1325 1326 self.friendlyName = 'IFO status summary plots' 1327 pipeline.CondorDAGNode.__init__(self,IFOstatus_checkJob) 1328 self.add_var_opt("ifo-times", ifo) 1329 self.add_var_opt("gps-time", trig.gpsTime[ifo]) 1330 self.add_var_opt("ifo-tag", "FOLLOWUP_"+ifo) 1331 self.add_var_opt("user-tag", str(trig.eventID)) 1332 self.id = IFOstatus_checkJob.name + '-' + str(ifo) + '-' + str(trig.statValue) + '_' + str(trig.eventID) 1333 self.setupNodeWeb(IFOstatus_checkJob,True, dag.webPage.lastSection.lastSub,dag.page,None,None) 1334 1335 if not opts.disable_dag_categories: 1336 self.set_category(IFOstatus_checkJob.name.lower()) 1337 1338 if opts.ifo_status_check: 1339 dag.addNode(self,self.friendlyName) 1340 self.validate() 1341 else: self.invalidate()
1342 1343 ############################################################################## 1344
1345 -class followupoddsJob(pipeline.CondorDAGJob, webTheJob):
1346 """ 1347 A model selection job 1348 """ 1349 defaults={ 1350 "section":"condor", 1351 "options":{ 1352 "universe":"vanilla", 1353 "followupodds":"lalapps_inspnest" 1354 } 1355 }
1356 - def __init__(self,options,cp,tag_base='FOLLOWUPODDS'):
1357 """ 1358 """ 1359 if not(verifyCP(cp,self.defaults)): 1360 modifyCP(cp,self.defaults) 1361 self.__prog__='followupoddsjob' 1362 self.__executable=string.strip(cp.get('condor','followupodds')) 1363 self.__universe="standard" 1364 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 1365 self.setupJobWeb(self.__prog__,tag_base)
1366
1367 -class followupoddsNode(pipeline.CondorDAGNode,webTheNode):
1368 """ 1369 Runs an instance of the model selection followup job 1370 """
1371 - def __init__(self,followupoddsJob,procParamsTable,trig,randomseed,cp,opts,dag):
1372 try: 1373 IFOs = trig.ifolist_in_coinc 1374 time_prior = string.strip(cp.get('followup-odds','time_prior')) 1375 #Nlive = string.strip(cp.get('followup-odds','live-points')) 1376 Nlive = string.strip(cp.get('followup-odds','min-live')) 1377 Nmcmc = string.strip(cp.get('followup-odds','Nmcmc')) 1378 srate = string.strip(cp.get('followup-odds','sample_rate')) 1379 Approximant = string.strip(cp.get('followup-odds','approximant')) 1380 self.friendlyName = 'Odds followup job' 1381 pipeline.CondorDAGNode.__init__(self,followupoddsJob) 1382 cacheFiles=[] 1383 GPSstarts=[] 1384 GPSends=[] 1385 for ifo in IFOs: 1386 for row in procParamsTable[ifo]: 1387 param=row.param.strip("-") 1388 value=row.value 1389 if param == 'frame-cache': cacheFile=value 1390 if param == 'gps-start-time': 1391 GPSstarts.append(float(value)) 1392 if param == 'gps-end-time': 1393 GPSends.append(float(value)) 1394 self.add_var_arg("--IFO "+str(ifo)) 1395 self.add_var_arg("--cache " +str(cacheFile)) 1396 #Check the start and end times are OK 1397 1398 GPSstart=str(max(GPSstarts)+64) 1399 GPSend=str(min(GPSends)-64) 1400 1401 #Check the start and end times are OK 1402 GPSstart=str(max(GPSstarts)+64) 1403 GPSend=str(min(GPSends)-64) 1404 1405 outputname = followupoddsJob.name + '/'+followupoddsJob.name+'-' \ 1406 +trig.ifos+'-'+str(trig.statValue)+'_'+str(trig.eventID)+'_'+randomseed[0]+'.dat' 1407 self.add_var_opt("Nlive",Nlive) 1408 self.add_var_opt("GPSstart",GPSstart) 1409 self.add_var_opt("length",str(float(GPSend)-float(GPSstart))) 1410 self.add_var_opt("approximant",Approximant) 1411 self.add_var_opt("out",outputname) 1412 self.add_var_opt("Nsegs",str((int(float(GPSend))-int(float(GPSstart)))/8)) 1413 self.add_var_opt("dt",time_prior) 1414 self.add_var_opt("end_time",trig.gpsTime[ifo]) 1415 self.add_var_opt("Mmin",2.8) 1416 self.add_var_opt("Mmax",30) 1417 self.add_var_opt("srate",srate) 1418 self.add_var_opt("seed",randomseed[0]) 1419 #self.add_var_opt("randomseed","[" + randomseed[0] + "," + randomseed[1] + "]") 1420 self.id = followupoddsJob.name + '-' + trig.ifos + '-' + str(trig.statValue) + '_' + str(trig.eventID) + '_' + randomseed[0] 1421 self.outputCache = trig.ifos + ' ' + followupoddsJob.name + ' ' +\ 1422 self.id.split('-')[-1]+' '+outputname+'\n' 1423 self.add_var_opt("channel",string.strip(cp.get("followup-coh-trigbank",trig.ifos[0:2]+"_channel"))) 1424 1425 #print "Using IFOs " + str(IFOs) 1426 1427 self.setupNodeWeb(followupoddsJob,False,None,None,None,dag.cache) 1428 1429 #print "Arguments: " + str(self.get_cmd_line()) 1430 1431 if opts.odds and float(GPSend)-float(GPSstart)>=24: 1432 dag.addNode(self,self.friendlyName) 1433 self.validate() 1434 else: self.invalidate() 1435 1436 except: 1437 self.invalidate() 1438 print "Couldn't add followupOdds job for " + str(trig.gpsTime[ifo])
1439 1440 1441 ########################################################################### 1442
1443 -class followupOddsPostJob(pipeline.CondorDAGJob,webTheJob):
1444 """ 1445 The post-processing of odds jobs 1446 """ 1447 defaults={ 1448 "section":"condor", 1449 "options":{ 1450 "universe":"vanilla", 1451 "oddsPostScript":"OddsPostProc.py" 1452 } 1453 }
1454 - def __init__(self,options,cp,tag_base='FOLLOWUPODDSPOST'):
1455 """ 1456 """ 1457 if not(verifyCP(cp,self.defaults)): 1458 modifyCP(cp,self.defaults) 1459 self.__prog__='followupOddsPostJob' 1460 self.__executable=string.strip(cp.get('condor','oddsPostScript')) 1461 self.__universe="vanilla" 1462 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 1463 self.setupJobWeb(self.__prog__,tag_base)
1464 1465 1466 ############################################################################## 1467
1468 -class followupOddsPostNode(pipeline.CondorDAGNode,webTheNode):
1469 """ 1470 Runs the post-processing script 1471 """
1472 - def __init__(self,oddsPostJob,procParams,trig,oddsjoblist,cp,opts,dag):
1473 try: 1474 self.friendlyName = 'Odds plotting job' 1475 pipeline.CondorDAGNode.__init__(self,oddsPostJob) 1476 1477 # get the list of odds .txt files to be used as input 1478 for oddsjobId in oddsjoblist: 1479 oddsfile = oddsjobId.split('-')[0]+'/' + oddsjobId + '.dat' # here we assume that the directory name is mcmcId.split('-')[0] 1480 self.add_var_arg("--data " + oddsfile) 1481 1482 # Get the number of live points in each run 1483 Nlive = string.strip(cp.get('followup-odds','min-live')) 1484 self.add_var_opt("Nlive",Nlive) 1485 1486 if cp.has_option('followup-odds','web') and string.strip(cp.get('followup-odds','web')): 1487 outputpath = string.strip(cp.get('followup-odds','web')) 1488 else: 1489 outputpath = oddsPostJob.name + "/" + str(trig.eventID) 1490 if not os.access(outputpath,os.F_OK): 1491 os.mkdir(outputpath) 1492 1493 self.add_var_opt("outpath",outputpath) 1494 1495 self.id = oddsPostJob.name + '-' + trig.ifos + '-' + str(trig.statValue) + '_' + str(trig.eventID) 1496 1497 #output_page = self.id 1498 self.outputCache = self.id.replace('-',' ') + " " + os.path.abspath(outputpath) + "/" + self.id + "\n" 1499 1500 self.setupNodeWeb(oddsPostJob,False,dag.webPage.lastSection.lastSub,None,None,dag.cache) 1501 1502 # only add a parent if it exists 1503 for node in dag.get_nodes(): 1504 if isinstance(node,followupoddsNode): 1505 if not node.id.find(trig.ifos + '-' + str(trig.statValue) + '_' + str(trig.eventID)) == -1: 1506 try: 1507 if node.validNode: self.add_parent(node) 1508 except: pass 1509 1510 if opts.odds: 1511 dag.addNode(self,self.friendlyName) 1512 self.validate() 1513 else: 1514 self.invalidate() 1515 1516 except: 1517 self.invalidate() 1518 print "couldn't add odds post job for " + str(trig.ifos) + "@ "+ str(trig.gpsTime[trig.ifolist_in_coinc[-1]])
1519 1520 1521 ############################################################################## 1522
1523 -class followupmcmcJob(pipeline.CondorDAGJob, webTheJob):
1524 defaults={ 1525 "section":"condor", 1526 "options":{ 1527 "universe":"vanilla", 1528 "followupmcmc":"lalapps_followupMcmc" 1529 } 1530 } 1531 """ 1532 An mcmc job 1533 """
1534 - def __init__(self, options, cp, tag_base='FOLLOWUPMCMC'):
1535 """ 1536 """ 1537 if not(verifyCP(cp,self.defaults)): 1538 modifyCP(cp,self.defaults) 1539 self.__prog__ = 'followupmcmcJob' 1540 self.__executable = string.strip(cp.get('condor','followupmcmc')) 1541 self.__universe = "standard" 1542 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 1543 self.setupJobWeb(self.__prog__,tag_base)
1544 1545 ############################################################################### 1546
1547 -class followupmcmcNode(pipeline.CondorDAGNode,webTheNode):
1548 """ 1549 Runs an instance of an mcmc followup job 1550 """ 1551 #def __init__(self, followupmcmcJob, procParams, ifo, trig, randomseed, cp,opts,dag):
1552 - def __init__(self, followupmcmcJob, procParams, trig, randomseed, cp, opts, dag, ifo=None):
1553 1554 try: 1555 time_margin = string.strip(cp.get('followup-mcmc','prior-coal-time-marg')) 1556 iterations = string.strip(cp.get('followup-mcmc','iterations')) 1557 tbefore = string.strip(cp.get('followup-mcmc','tbefore')) 1558 tafter = string.strip(cp.get('followup-mcmc','tafter')) 1559 massmin = string.strip(cp.get('followup-mcmc','massmin')) 1560 massmax = string.strip(cp.get('followup-mcmc','massmax')) 1561 dist90 = string.strip(cp.get('followup-mcmc','dist90')) 1562 dist10 = string.strip(cp.get('followup-mcmc','dist10')) 1563 1564 self.friendlyName = 'MCMC followup' 1565 pipeline.CondorDAGNode.__init__(self,followupmcmcJob) 1566 1567 if ifo: 1568 IFOs = [ifo] 1569 self.ifonames = ifo 1570 else: 1571 IFOs = trig.ifolist_in_coinc 1572 self.ifonames = trig.ifos 1573 1574 cacheFiles = "" 1575 channelNames = "" 1576 chunk_end_list = {} 1577 chunk_start_list = {} 1578 for itf in IFOs: 1579 for row in procParams[itf]: 1580 param = row.param.strip("-") 1581 value = row.value 1582 if param == 'frame-cache': cacheFile = value 1583 if param == 'channel-name': channel = value 1584 if param == 'gps-end-time': chunk_end = value 1585 if param == 'gps-start-time': chunk_start = value 1586 cacheFiles += cacheFile + "," 1587 channelNames += channel + "," 1588 chunk_end_list[itf] = chunk_end 1589 chunk_start_list[itf] = chunk_start 1590 1591 if len(IFOs) > 1: 1592 maxSNR = 0 1593 maxIFO = "" 1594 for trigger in trig.coincs: 1595 snr = trigger.snr 1596 if snr > maxSNR: 1597 maxSNR = snr 1598 maxIFO = trigger.ifo 1599 else: 1600 maxIFO = IFOs[0] 1601 trig_tempo = getattr(trig.coincs,maxIFO) 1602 triggerRef = copy.deepcopy(trig_tempo) 1603 self.ifoRef = maxIFO 1604 1605 self.add_var_opt("template",string.strip(cp.get('followup-mcmc','template'))) 1606 self.add_var_opt("iterations",iterations) 1607 self.add_var_opt("randomseed",randomseed) 1608 self.add_var_opt("tcenter","%0.3f"%trig.gpsTime[maxIFO]) 1609 self.add_var_opt("tbefore",tbefore) 1610 self.add_var_opt("tafter",tafter) 1611 1612 tmin = trig.gpsTime[maxIFO] - float(time_margin) 1613 tmax = trig.gpsTime[maxIFO] + float(time_margin) 1614 self.add_var_opt("priorparameters","[" + massmin + "," + massmax +