Package pylal :: Module stfu_pipe
[hide private]
[frames] | no frames]

Source Code for Module pylal.stfu_pipe

   1  """ 
   2  This module contains condor jobs / node classes for the SQlite Triggered Follow Up dag 
   3  """ 
   4   
   5  __author__ = 'Chad Hanna <channa@phys.lsu.edu>, Cristina Torres <cristina.torres@ligo.org>, Romain Gouaty <gouaty@lapp.in2p3.fr>' 
   6   
   7  import sqlite3 
   8   
   9  import sys, os, copy, math, math, subprocess, socket, re, string 
  10  import time as time_method 
  11  from optparse import * 
  12  import tempfile 
  13  import ConfigParser 
  14  import urlparse 
  15  from UserDict import UserDict 
  16   
  17  sys.path.append('@PYTHONLIBDIR@') 
  18   
  19  from glue import segments 
  20  from glue import segmentsUtils 
  21  from glue.ligolw import ligolw 
  22  from glue.ligolw import table 
  23  from glue.ligolw import lsctables 
  24  #from glue.ligolw import dbtables 
  25  from glue.ligolw import utils 
  26  from glue import pipeline 
  27  from glue import lal 
  28  #from pylal import db_thinca_rings 
  29  from lalapps import inspiral 
  30  from lalburst import date 
  31  from pylal.xlal import date as xlaldate 
  32  from pylal.xlal.datatypes.ligotimegps import LIGOTimeGPS 
  33  #dbtables.lsctables.LIGOTimeGPS = LIGOTimeGPS 
  34  lsctables.LIGOTimeGPS = LIGOTimeGPS 
  35   
  36  ############################################################################### 
  37  ##### UTILITY FUNCTIONS ####################################################### 
  38  ############################################################################### 
  39   
  40   
41 -def mkdir(output):
42 # MAKE SURE WE CAN WRITE TO THE OUTPUT DIRECTORY 43 if not os.access(output,os.F_OK): os.makedirs(output) 44 else: 45 if not os.access(output,os.W_OK): 46 print >> sys.stderr, 'path '+output+' is not writable' 47 sys.exit(1)
48
49 -def science_run(time):
50 if time >= 815155213 and time <= 875232014: return 's5' 51 if time >= 931035296 and time <= 999999999: return 's6' 52 print >>sys.stderr, "COULD NOT DETERMINE SCIENCE RUN from %d" % (int(time),) 53 sys.exit(1)
54
55 -def get_day_boundaries(time):
56 57 # determine the start Time : 00:00:00 UTC from the day before 58 # and the end time, 00:00:00 UTC the current day 59 60 gps = LIGOTimeGPS(time) 61 start_gps = int(date.utc_midnight(gps)) 62 end_gps = start_gps + 86400 63 return str(start_gps),str(end_gps)
64 65
66 -def figure_out_type(time, ifo=None, data_type='hoft'):
67 """ 68 Run boundaries (from CBC analyses): 69 VSR1: 863557214 - 875232014 70 S5: 815155213 - 875232014 71 VSR2/S6: 931035296 - ... 72 Frame types for S5/VSR1: 73 () For RDS_R_L1 data set: 74 type channel_name 75 RDS_R_L1 H1:LSC-DARM_ERR 76 RDS_R_L1 H2:LSC-DARM_ERR 77 RDS_R_L1 L1:LSC-DARM_ERR 78 () For hoft data: 79 type channel_name 80 H1_RDS_C03_L2 H1:LSC-STRAIN 81 H2_RDS_C03_L2 H2:LSC-STRAIN 82 L1_RDS_C03_L2 L1:LSC-STRAIN 83 HrecV2_16384Hz V1:h_16384Hz 84 Frame types for S6/VSR2: 85 () For RDS_R_L1 data set: 86 type channel_name 87 H1_RDS_R_L1 H1:LSC-DARM_ERR 88 L1_RDS_R_L1 L1:LSC-DARM_ERR 89 () For hoft data: 90 H1_LDAS_C00_L2 H1:LDAS-STRAIN 91 L1_LDAS_C00_L2 L1:LDAS-STRAIN 92 HrecOnline V1:h_16384Hz 93 """ 94 L1HoftTypes=( 95 ("L1_RDS_C03_L2","L1:LSC-STRAIN",815155213,875232014), 96 ("L1_LDAS_C00_L2","L1:LDAS-STRAIN",931035296,941997600), 97 ("L1_LDAS_C02_L2","L1:LDAS-STRAIN",941997600,999999999) 98 ) 99 H1HoftTypes=( 100 ("H1_RDS_C03_L2","H1:LSC-STRAIN",815155213,875232014), 101 ("H1_LDAS_C00_L2","H1:LDAS-STRAIN",931035296,941997600), 102 ("H1_LDAS_C02_L2","H1:LDAS-STRAIN",941997600,999999999) 103 ) 104 H2HoftTypes=( 105 ("H2_RDS_C03_L2","H2:LSC-STRAIN",815155213,875232014), 106 ("H1_LDAS_C00_L2","H1:LDAS-STRAIN",931035296,999999999) 107 ) 108 V1HoftTypes=( 109 ("HrecV2_16384Hz","V1:h_16384Hz",863557214,875232014), 110 ("HrecOnline","V1:h_16384Hz",931035296,999999999) 111 ) 112 L1RdsTypes=( 113 ("RDS_R_L1","L1:LSC-DARM_ERR",815155213,875232014), 114 ("L1_RDS_R_L1","L1:LSC-DARM_ERR",931035296,999999999) 115 ) 116 H1RdsTypes=( 117 ("RDS_R_L1","H1:LSC-DARM_ERR",815155213,875232014), 118 ("H1_RDS_R_L1","H1:LSC-DARM_ERR",931035296,999999999) 119 ) 120 H2RdsTypes=( 121 ("RDS_R_L1","H2:LSC-DARM_ERR",815155213,875232014), 122 ("H1_RDS_R_L1","H1:LSC-DARM_ERR",931035296,999999999) 123 ) 124 V1RdsTypes=( 125 ("raw","V1:Pr_B1_ACp",863557214,875232014), 126 ("raw","V1:Pr_B1_ACp",931035296,999999999) 127 ) 128 channelMap={ 129 "L1":{"hoft":L1HoftTypes,"rds":L1RdsTypes}, 130 "H1":{"hoft":H1HoftTypes,"rds":H1RdsTypes}, 131 "H2":{"hoft":H2HoftTypes,"rds":H2RdsTypes}, 132 "V1":{"hoft":V1HoftTypes,"rds":V1RdsTypes} 133 } 134 #Use the IFO type to select the channel type 135 foundType="" 136 foundChannel="" 137 if ifo == None: 138 print time," ifo argument to figure_out_type should not be null!" 139 os.abort() 140 141 for type,channel,start,stop in channelMap[ifo][data_type]: 142 if ((start<=time) and (time<=stop)): 143 foundType=type 144 foundChannel=channel 145 break 146 if foundType == "": 147 print time,ifo + " not found in method stfu_pipe.figure_out_type" 148 os.abort() 149 return str(foundType), str(foundChannel)
150
151 -def figure_out_cache(time,ifo):
152 153 cacheList=( 154 (home_dirs()+"/romain/followupbackgrounds/omega/S5/background/background_815155213_875232014.cache",815155213,875232014,"H1H2L1"), 155 (home_dirs()+"/romain/followupbackgrounds/omega/S6a/background/background_931035296_935798415.cache",931035296,935798415,"H1L1"), 156 (home_dirs()+"/romain/followupbackgrounds/omega/S6b/background/background_937800015_944587815.cache",935798415,944587815,"H1L1"), 157 (home_dirs()+"/romain/followupbackgrounds/omega/S6b/background/background_944587815_947260815.cache",944587815,947260815,"H1L1"), 158 (home_dirs()+"/romain/followupbackgrounds/omega/S6c/background/background_948672015_961545615.cache",948672015,961545687,"H1L1"), 159 (home_dirs()+"/romain/followupbackgrounds/omega/S6d/background/background_961545607_968803223.cache",961545687,999999999,"H1L1"), 160 (home_dirs()+"/romain/followupbackgrounds/omega/VSR2aRerun/background/background_931035296_935798415.cache",931035296,935798415,"V1"), 161 (home_dirs()+"/romain/followupbackgrounds/omega/VSR2bRerun/background/background_937800015_947260815.cache",935798415,947116815,"V1"), 162 (home_dirs()+"/romain/followupbackgrounds/omega/VSR3preCommissioning/background/background_966124815_968025615.cache",964310415,968284815,"V1"), 163 (home_dirs()+"/romain/followupbackgrounds/omega/VSR3postCommissioning/background/background_968544015_971568015.cache",968284815,999999999,"V1") 164 ) 165 166 foundCache = "" 167 for cacheFile,start,stop,ifos in cacheList: 168 if ((start<=time) and (time<stop) and ifo in ifos): 169 foundCache = cacheFile 170 break 171 172 if 'phy.syr.edu' in get_hostname(): 173 foundCache = foundCache.replace("romain","rgouaty") 174 175 if foundCache == "": 176 print ifo, time, " not found in method stfu_pipe.figure_out_cache" 177 else: 178 if not os.path.isfile(foundCache): 179 print "file " + foundCache + " not found" 180 foundCache = "" 181 182 return foundCache
183
184 -def home_dirs():
185 return os.path.split(os.path.abspath(os.environ['HOME']))[0]
186
187 -def get_hostname():
188 host = socket.getfqdn() 189 return host
190 191 ############################################################################### 192 ##### USEFULL FUNCTIONS CALLED BY PYTHON JOBS 193 ############################################################################### 194
195 -def getParamsFromCache(fileName,type,ifo=None,time=None):
196 qscanList = [] 197 cacheList = lal.Cache.fromfile(open(fileName)) 198 if not cacheList: 199 return qscanList 200 cacheSelected = cacheList.sieve(description=type,ifos=ifo) 201 if time: 202 if math.floor(float(time)) != math.ceil(float(time)): 203 cacheSelected = cacheSelected.sieve(segment=segments.segment(math.floor(float(time)), math.ceil(float(time)))) 204 else: 205 cacheSelected = cacheSelected.sieve(segment=segments.segment(math.floor(float(time))-0.5, math.floor(float(time))+0.5)) 206 207 for cacheEntry in cacheSelected: 208 path_output = cacheEntry.path 209 time_output = str(cacheEntry.segment[0]) 210 type_output = cacheEntry.description 211 ifo_output = cacheEntry.observatory 212 qscanList.append([path_output,time_output,type_output,ifo_output]) 213 214 return qscanList
215 216 ############################################################################### 217 ##### CONDOR JOB CLASSES ###################################################### 218 ############################################################################### 219 220 # DO SOME STANDARD STUFF FOR FU JOBS
221 -class FUJob(object):
222 """ 223 """ 224
225 - def __init__(self):
226 pass
227
228 - def __conditionalLoadDefaults__(self,defaults,cp):
229 if not(cp.has_section(defaults["section"])): 230 cp.add_section(defaults["section"]) 231 for key, val in defaults["options"].iteritems(): 232 if not cp.has_option(defaults["section"], key): 233 cp.set(defaults["section"], key, val)
234
235 - def setupJob(self, name='job', dir= '', tag_base=None, cp=None):
236 # Give this job a name. Then make directories for the log files and such 237 # This name is important since these directories will be included in 238 # the web tree. 239 if dir and not os.path.isdir(dir): 240 os.mkdir(dir) 241 self.name = name + "_" + tag_base + "_" + os.path.split(dir.rstrip('/'))[1] 242 if dir: 243 self.relPath = dir + '/' + self.name 244 else: 245 self.relPath = self.name 246 self.outputPath = os.getcwd() + '/' + self.relPath + '/' 247 self.tag_base = tag_base 248 if not os.path.isdir(self.relPath): 249 os.mkdir(self.relPath) 250 if not os.path.isdir(self.relPath+'/logs'): 251 os.mkdir(self.relPath+'/logs') 252 if not os.path.isdir(self.relPath+'/Images'): 253 os.mkdir(self.relPath+'/Images') 254 if not os.path.isdir(self.relPath+'/DataProducts'): 255 os.mkdir(self.relPath+'/DataProducts') 256 # Set up the usual stuff and name the log files appropriately 257 try: self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes") 258 except: pass 259 self.set_sub_file(self.name+'.sub') 260 self.set_stdout_file(self.outputPath+'/logs/'+self.name+'-$(macroid).out') 261 self.set_stderr_file(self.outputPath+'/logs/'+self.name+'-$(macroid).err') 262 if cp: 263 if cp.has_section("condor-memory-requirement") and cp.has_option("condor-memory-requirement",name): 264 requirement = cp.getint("condor-memory-requirement",name) 265 self.add_condor_cmd("Requirements", "(Memory > " + str(requirement) + ")")
266 267 # QSCAN JOB CLASS
268 -class qscanJob(pipeline.CondorDAGJob, FUJob):
269 """ 270 A qscan job 271 """
272 - def __init__(self, opts, cp, dir='', tag_base=''):
273 """ 274 """ 275 self.__executable = string.strip(cp.get('fu-condor','qscan')) 276 self.name = os.path.split(self.__executable.rstrip('/'))[1] 277 self.__universe = "vanilla" 278 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 279 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base) 280 self.setup_checkForDir() 281 self.setup_rm_lock()
282
283 - def is_dax(self):
284 return False
285
286 - def setup_checkForDir(self):
287 # create a shell script to check for the existence of the qscan output directory and rename it if needed 288 checkdir_script = open('checkForDir.sh','w') 289 checkdir_script.write("""#!/bin/bash 290 if [ -d $1/$2 ] 291 then 292 matchingList=$(echo $(find $1 -name $2.bk*)) 293 COUNTER=1 294 for file in $matchingList 295 do 296 let COUNTER=COUNTER+1 297 done 298 mv $1/$2 $1/$2.bk.$COUNTER 299 fi 300 """) 301 checkdir_script.close() 302 os.chmod('checkForDir.sh',0755)
303
304 - def setup_rm_lock(self):
305 rm_lock_script = open('rmLock.sh','w') 306 rm_lock_script.write("#!/bin/bash\nif [ -e $1 ]\nthen\n\trm $1\nfi") 307 rm_lock_script.close() 308 os.chmod('rmLock.sh',0755)
309 310 # CLASS TO SETUP THE PROXY FILE NEEDED FOR REMOTE JOBS (CONDOR FLOCKING) 311
312 -class setupProxyJob(pipeline.CondorDAGJob, FUJob):
313 """ 314 """
315 - def __init__(self, opts, cp, dir='', tag_base=''):
316 self.setup_proxy_script() 317 self.__executable = "getProxy.sh" 318 self.name = os.path.split(self.__executable.rstrip('/'))[1] 319 self.__universe = "local" 320 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 321 self.add_condor_cmd('getenv','True') 322 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
323
324 - def setup_proxy_script(self):
325 proxy_script = open('getProxy.sh','w') 326 proxy_script.write("""#!/bin/bash 327 if [ ! -e \"proxy.pem\" ] 328 then 329 file=`grid-proxy-info -path` 330 cp ${file} proxy.pem 331 fi 332 """) 333 proxy_script.close() 334 os.chmod('getProxy.sh',0755)
335 336 # DISTRIBUTE REMOTE QSCANS CLASS 337
338 -class distribRemoteQscanJob(pipeline.CondorDAGJob, FUJob):
339 """ 340 This class sets up a script to be run as child of the remote scans in order to distribute its results to the appropriate paths. It takes the qscan tarball as input, uncompress it and copy the results to the path specified in cache file. 341 Moreover this job also deletes the temporary remote datafind cache files in order to clean up the followup directory. 342 """
343 - def __init__(self, opts, cp, dir='', tag_base=''):
344 """ 345 """ 346 self.setup_distrib_script(dir,tag_base) 347 self.__executable = 'distribRemoteScan_'+dir+'_'+tag_base+'.sh' 348 self.name = os.path.split(self.__executable.rstrip('/'))[1] 349 self.__universe = "vanilla" 350 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 351 self.add_condor_cmd('getenv','True') 352 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
353
354 - def setup_distrib_script(self,dir,tag_base):
355 distrib_script = open('distribRemoteScan_'+dir+'_'+tag_base+'.sh','w') 356 distrib_script.write("""#!/bin/bash 357 currentPath=`pwd` ; 358 mv $1 $2/. ; 359 cd $2 ; 360 tar -xzvf $1 ; 361 cd $currentPath ; 362 for figPath in `find $2/$3 -name "*.png" -print` ; do 363 echo $figPath ; 364 thumbPath=`echo $figPath | sed s/.png/.thumb.png/g` ; 365 figDPI=120; 366 fullSize='600x'; 367 thumbSize='300x'; 368 convert -resize $thumbSize -strip -depth 8 -colors 256 $figPath $thumbPath ; 369 done 370 rm $2/$1 ; 371 if [ -e $4 ] 372 then 373 rm $4 ; 374 fi 375 """) 376 distrib_script.close() 377 os.chmod('distribRemoteScan_'+dir+'_'+tag_base+'.sh',0755)
378 379 # REMOTE DATAFIND JOB 380
381 -class remoteDatafindJob(pipeline.CondorDAGJob, FUJob):
382 """ 383 """
384 - def __init__(self, opts, cp, tag_base='', dir=''):
385 386 self.setup_df_submission_script(dir,tag_base) 387 self.__executable = 'remoteDatafind_'+dir+'_'+tag_base+'.sh' 388 self.dir = dir 389 self.name = os.path.split(self.__executable.rstrip('/'))[1] 390 self.__universe = "vanilla" 391 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 392 self.add_condor_cmd('getenv','True') 393 self.add_condor_cmd('input','proxy.pem') 394 self.add_condor_cmd('should_transfer_files','yes') 395 self.add_condor_cmd('when_to_transfer_output','ON_EXIT_OR_EVICT') 396 self.add_condor_cmd('+RunOnEGEEGrid','True') 397 self.add_condor_cmd("Requirements","(Arch == \"INTEL\" || Arch == \"X86_64\" ) && ( Pilot_SiteName == \"Bologna\")") 398 self.add_condor_cmd('transfer_output_files','$(macrooutputfile)') 399 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
400
401 - def setup_df_submission_script(self,dir,tag_base):
402 submit_script = open('remoteDatafind_'+dir+'_'+tag_base+'.sh','w') 403 submit_script.write("""#!/bin/bash 404 . /opt/exp_software/virgo/etc/virgo-env.sh 405 . /opt/glite/etc/profile.d/grid-env.sh 406 export X509_USER_PROXY=`pwd`/proxy.pem 407 /opt/exp_software/virgo/lscsoft/etc/LSCdataFind --observatory $1 --gps-start-time $2 --gps-end-time $3 --url-type file --lal-cache --type $4 --output $5 408 outputCache=$5 409 outputQcache=${outputCache/.cache/.qcache} 410 /storage/gpfs_virgo3/virgo/omega/omega_r3270_glnxa64_binary/bin/convertlalcache $5 %s-%s-$outputQcache 411 """%(dir,tag_base)) 412 submit_script.close() 413 os.chmod('remoteDatafind_'+dir+'_'+tag_base+'.sh',0755)
414 415 # REMOTE QSCAN CLASS 416
417 -class remoteQscanJob(pipeline.CondorDAGJob, FUJob):
418 """ 419 A remote qscan job 420 """
421 - def __init__(self, opts, cp, dir='', tag_base=''):
422 """ 423 """ 424 self.setup_submission_script(dir,tag_base) 425 self.__executable = "remoteScan_"+dir+"_"+tag_base+".sh" 426 self.dir = dir 427 self.name = os.path.split(self.__executable.rstrip('/'))[1] 428 self.__universe = "vanilla" 429 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 430 self.add_condor_cmd('getenv','True') 431 self.add_condor_cmd('input','proxy.pem') 432 self.add_condor_cmd('should_transfer_files','yes') 433 self.add_condor_cmd('when_to_transfer_output','ON_EXIT_OR_EVICT') 434 self.add_condor_cmd('+RunOnEGEEGrid','True') 435 self.add_condor_cmd("Requirements","(Arch == \"INTEL\" || Arch == \"X86_64\" ) && ( Pilot_SiteName == \"Bologna\")") 436 self.add_condor_cmd('transfer_output_files','$(macrooutputfile)') 437 self.add_condor_cmd('transfer_input_files','$(macroinputfile)') 438 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
439
440 - def setup_submission_script(self,dir,tag_base):
441 submit_script = open('remoteScan_'+dir+'_'+tag_base+'.sh','w') 442 submit_script.write("""#!/bin/bash 443 . /opt/exp_software/virgo/etc/virgo-env.sh 444 . /opt/glite/etc/profile.d/grid-env.sh 445 . /storage/gpfs_virgo3/virgo/omega/omega_env.sh 446 export X509_USER_PROXY=`pwd`/proxy.pem 447 /storage/gpfs_virgo3/virgo/omega/omega_r3270_glnxa64_binary/bin/wpipeline scan -r -c $1 -f $2 -o $3 $4 448 449 tar -czf %s-%s-$4.tgz $3 450 """%(dir,tag_base)) 451 submit_script.close() 452 os.chmod('remoteScan_'+dir+'_'+tag_base+'.sh',0755)
453 454 # A CLASS TO ANALYSE QSCAN RESULTS
455 -class analyseQscanJob(pipeline.CondorDAGJob,FUJob):
456
457 - def __init__(self,opts,cp,dir='',tag_base=''):
458 459 self.__executable = string.strip(cp.get('fu-condor','analyseQscan')) 460 self.name = os.path.split(self.__executable.rstrip('/'))[1] 461 self.name_for_background = self.name + "_" + tag_base 462 self.__universe = "vanilla" 463 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 464 self.add_condor_cmd('getenv','True') 465 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
466 467 468 # A CLASS TO DO FOLLOWUP INSPIRAL JOBS
469 -class followUpInspJob(inspiral.InspiralJob,FUJob):
470
471 - def __init__(self,cp,dir='', tag_base=''):
472 473 inspiral.InspiralJob.__init__(self,cp) 474 475 if tag_base == 'head': 476 self.set_executable(string.strip(cp.get('fu-condor','inspiral_head'))) 477 #self.name = 'followUpInspJob' + type 478 479 self.name = os.path.split(self.get_executable().rstrip('/'))[1] 480 self.setupJob(name=self.name, dir=dir, cp=cp, tag_base=tag_base)
481 482 # JOB CLASS FOR PRODUCING A SKYMAP
483 -class skyMapJob(pipeline.CondorDAGJob,FUJob):
484 """ 485 Generates sky map data 486 """
487 - def __init__(self, options, cp, dir='', tag_base=''):
488 """ 489 """ 490 #self.__prog__ = 'lalapps_skyMapJob' 491 self.__executable = string.strip(cp.get('fu-condor','lalapps_skymap')) 492 self.__universe = "standard" 493 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 494 self.add_condor_cmd('getenv','True') 495 496 self.name = os.path.split(self.__executable.rstrip('/'))[1] 497 self.setupJob(name=self.name,dir=dir, tag_base=tag_base) 498 499 self.ra_res = cp.get("fu-skymap", 'ra-res') 500 self.dec_res = cp.get("fu-skymap", 'dec-res') 501 self.sample_rate = cp.get("fu-skymap", 'sample-rate')
502 503 # JOB CLASS FOR PRODUCING SKYMAP PLOT
504 -class skyMapPlotJob(pipeline.CondorDAGJob,FUJob):
505 """ 506 Plots the sky map output of lalapps_skymap 507 """
508 - def __init__(self, options, cp, dir='',tag_base=''):
509 """ 510 """ 511 #self.__prog__ = 'pylal_skyPlotJob' 512 self.__executable = string.strip(cp.get('fu-condor','pylal_skyPlotJob')) 513 self.__universe = "vanilla" 514 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 515 self.add_condor_cmd('getenv','True') 516 self.name = os.path.split(self.__executable.rstrip('/'))[1] 517 self.setupJob(name=self.name,dir=dir,tag_base=tag_base) 518 519 self.ra_res = cp.get("fu-skymap", 'ra-res') 520 self.dec_res = cp.get("fu-skymap", 'dec-res') 521 self.sample_rate = cp.get("fu-skymap", 'sample-rate')
522 523 # JOB CLASS FOR PLOTTING SNR AND CHISQ
524 -class plotSNRChisqJob(pipeline.CondorDAGJob,FUJob):
525 """ 526 A followup plotting job for snr and chisq time series 527 """
528 - def __init__(self, options, cp, dir='', tag_base=''):
529 """ 530 """ 531 #self.__prog__ = 'plotSNRCHISQJob' 532 self.__executable = string.strip(cp.get('fu-condor','plotsnrchisq')) 533 self.__universe = "vanilla" 534 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 535 self.add_condor_cmd('getenv','True') 536 537 self.name = os.path.split(self.__executable.rstrip('/'))[1] 538 self.setupJob(name=self.name,tag_base=tag_base,dir=dir)
539
540 -class fuDataFindJob(pipeline.LSCDataFindJob,FUJob):
541 - def __init__(self, config_file, dir='', tag_base=''):
542 543 #self.name = name 544 545 # unfortunately the logs directory has to be created before we call LSCDataFindJob 546 #try: 547 # os.mkdir(self.name) 548 # os.mkdir(self.name + '/logs') 549 #except: pass 550 551 self.__executable = string.strip(config_file.get('condor','datafind')) 552 553 # MUST DO THIS FIRST SO THE LOG DIRECTORIES ARE MADE 554 self.name = os.path.split(self.__executable.rstrip('/'))[1] 555 556 self.setupJob(name=self.name, tag_base=tag_base, dir=dir) 557 558 pipeline.LSCDataFindJob.__init__(self, self.relPath, self.relPath + '/logs', config_file) 559 self.setup_cacheconv(config_file)
560
561 - def setup_cacheconv(self,cp):
562 # create a shell script to call convertlalcache.pl if the value of $RETURN is 0 563 convert_script = open('cacheconv.sh','w') 564 #FIXME changed convert cache script to not fail on previous error? 565 convert_script.write("""#!/bin/bash 566 %s ${1} ${2} 567 if [ ${3} = \'y\' ]; then 568 cp ${2} . 569 fi 570 """ % string.strip(cp.get('fu-condor','convertcache'))) 571 convert_script.close() 572 os.chmod('cacheconv.sh',0755)
573 574 #This class is responsible for running the default job for making our 575 #wiki content
576 -class makeCheckListWikiJob(pipeline.CondorDAGJob,FUJob):
577 """ 578 This actually launches a default wiki creation job 579 """
580 - def __init__(self,opts,cp,dir='',tag_base=''):
581 """ 582 """ 583 self.__executable = string.strip(cp.get("fu-condor", 584 "makeCheckListWiki").strip()) 585 self.name = os.path.split(self.__executable.rstrip('/'))[1] 586 self.__universe = string.strip(cp.get("makeCheckListWiki", 587 "universe").strip()) 588 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 589 self.add_condor_cmd('getenv','True') 590 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base='_all')
591 #End makeCheckListWikiJob class 592 593 #This class is responsible for the followup page
594 -class makeFollowupPageJob(pipeline.CondorDAGJob,FUJob):
595 """ 596 This actually launches a followup page job 597 """
598 - def __init__(self,opts,cp,dir='',tag_base=''):
599 """ 600 """ 601 self.__executable = string.strip(cp.get("fu-condor", 602 "lalapps_followup_page").strip()) 603 self.name = os.path.split(self.__executable.rstrip('/'))[1] 604 self.__universe = "vanilla" 605 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 606 self.add_condor_cmd('getenv','True') 607 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base='_all')
608 #End makeFollowupPageJob class 609 610 611 #The class responsible for running the data quality flag finding job
612 -class findFlagsJob(pipeline.CondorDAGJob, FUJob):
613 """ 614 A job which queries the ldbd database holding segment 615 information about the DQ flags. 616 """ 617 defaults={"section":"fu-condor", 618 "options":{"universe":"local", 619 "dqflags":"followupQueryDQ.py"} 620 } 621
622 - def __init__(self, opts, cp, dir='', tag_base=""):
623 """ 624 """ 625 self.__conditionalLoadDefaults__(findFlagsJob.defaults,cp) 626 #self.__prog__ = 'findFlagsJob' 627 self.__executable = string.strip(cp.get('fu-condor','dqflags')) 628 self.__universe = string.strip(cp.get('fu-condor','universe')) 629 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 630 self.add_condor_cmd('getenv','True') 631 self.name = os.path.split(self.__executable.rstrip('/'))[1] 632 self.setupJob(name=self.name,tag_base=tag_base, dir=dir)
633 634 #The class responsible for checking for know veto flags
635 -class findVetosJob(pipeline.CondorDAGJob,FUJob):
636 """ 637 A job instance that queries the segment database for known 638 types of active veto intervals. 639 """ 640 defaults={"section":"fu-condor", 641 "options":{"universe":"local", 642 "vetoflags":"followupQueryVeto.py"} 643 }
644 - def __init__(self, opts,cp, dir='', tag_base=""):
645 """ 646 """ 647 self.__conditionalLoadDefaults__(findVetosJob.defaults,cp) 648 #self.__prog__ = 'findVetosJob' 649 self.__executable = string.strip(cp.get('fu-condor','vetoflags')) 650 self.__universe = string.strip(cp.get('fu-condor','universe')) 651 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 652 self.add_condor_cmd('getenv','True') 653 self.name = os.path.split(self.__executable.rstrip('/'))[1] 654 self.setupJob(name=self.name,tag_base=tag_base, dir=dir)
655 656 #The class responsible for Job Object running the customFOM builder python 657 #script!
658 -class customFOMPlotJob(pipeline.CondorDAGJob,FUJob):
659 """ 660 This is a job class which allows us to wrap up the script for 661 creating customized figure of merit(FOM) plots. The script, 662 followupCustomFOM.py, acutally contains a call to 663 ligo_data_find, via a subprocess. This removes our need 664 to have a datafind parent job. 665 """ 666 defaults={"section":"fu-condor", 667 "options":{"universe":"vanilla", 668 "customfom":"followupCustomFOM.py"} 669 }
670 - def __init__(self, opts, cp, dir='', tag_base=""):
671 """ 672 """ 673 self.__conditionalLoadDefaults__(customFOMPlotJob.defaults,cp) 674 #self.__prog__ = 'customFOMPlotJob' 675 self.__executable = string.strip(cp.get('fu-condor','customfom')) 676 self.__universe = string.strip(cp.get('fu-condor','universe')) 677 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 678 self.add_condor_cmd('getenv','True') 679 self.name = os.path.split(self.__executable.rstrip('/'))[1] 680 self.setupJob(name=self.name,tag_base=tag_base, dir=dir)
681 682 683 #The class responsible for running one type of parameter consistency check
684 -class effDRatioJob(pipeline.CondorDAGJob,FUJob):
685 """ 686 A job that performs parameter consitency check for a trigger 687 being followed up. 688 """ 689 defaults={"section":"fu-condor", 690 "options":{"universe":"local", 691 "effDRatio":"followupRatioTest.py"} 692 }
693 - def __init__(self, opts, cp, dir='', tag_base=""):
694 """ 695 """ 696 self.__conditionalLoadDefaults__(effDRatioJob.defaults,cp) 697 #self.__prog__ = 'effDRatioTest' 698 self.__executable = string.strip(cp.get('fu-condor','effDRatio')) 699 self.__universe = string.strip(cp.get('fu-condor','universe')) 700 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 701 self.add_condor_cmd('getenv','True') 702 self.name = os.path.split(self.__executable.rstrip('/'))[1] 703 self.setupJob(name=self.name,tag_base=tag_base, dir=dir)
704 705 # Follow up chia job
706 -class followUpChiaJob(inspiral.ChiaJob,FUJob):
707 """ 708 Generates coherent inspiral data 709 """ 710 defaults={ 711 "section":"condor", 712 "options":{ 713 "universe":"vanilla", 714 "chia":"lalapps_coherent_inspiral" 715 } 716 } 717
718 - def __init__(self, options, cp, dir='', tag_base=''):
719 """ 720 """ 721 self.__conditionalLoadDefaults__(followUpChiaJob.defaults,cp) 722 #self.__prog__ = 'followUpChiaJob' 723 self.__executable = string.strip(cp.get('condor','chia')) 724 self.__universe = "standard" 725 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 726 self.add_condor_cmd('getenv','True') 727 728 self.name = os.path.split(self.__executable.rstrip('/'))[1] 729 self.setupJob(name=self.name,tag_base=tag_base, dir=dir)
730 731 732 ############################################################################## 733 # jobs class for plotting coherent inspiral search and null stat timeseries 734
735 -class plotChiaJob(pipeline.CondorDAGJob,FUJob):
736 """ 737 A followup plotting job for coherent inspiral search and null stat timeseries 738 """ 739 defaults={ 740 "section":"condor", 741 "options":{ 742 "universe":"vanilla", 743 "plotchiatimeseries":"plotchiatimeseries" 744 } 745 } 746
747 - def __init__(self, options, cp, dir, tag_base=''):
748 """ 749 """ 750 #if not(verifyCP(cp,self.defaults)): modifyCP(cp,self.defaults) 751 self.__executable = string.strip(cp.get('fu-condor','plotchiatimeseries')) 752 self.__universe = "vanilla" 753 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 754 self.name = os.path.split(self.__executable.rstrip('/'))[1] 755 self.add_condor_cmd('getenv','True') 756 self.setupJob(name=self.name,tag_base=tag_base, dir=dir)
757 758 ############################################################################## 759 # jobs class for setting a mcmc run 760
761 -class mcmcJob(pipeline.CondorDAGJob, FUJob):
762 """ 763 A job to set up a mcmc run 764 """
765 - def __init__(self,opts,cp,dir='',tag_base=''):
766 """ 767 """ 768 self.__executable = string.strip(cp.get('fu-condor','mcmc')) 769 self.name = os.path.split(self.__executable.rstrip('/'))[1] 770 self.__universe = "standard" 771 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 772 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
773 774 ############################################################################## 775 # jobs class for setting a spinmcmc run 776
777 -class spinmcmcJob(pipeline.CondorDAGJob, FUJob):
778 """ 779 A job to set up a spinmcmc run 780 """
781 - def __init__(self,opts,cp,dir='',tag_base=''):
782 self.__executable = string.strip(cp.get('fu-condor','spinmcmc')) 783 self.name = os.path.split(self.__executable.rstrip('/'))[1] 784 self.__universe = "standard" 785 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 786 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
787 788 789 ############################################################################## 790 # jobs class for setting a the plotting of mcmc results 791
792 -class plotmcmcJob(pipeline.CondorDAGJob, FUJob):
793 """ 794 A job to set up a plotmcmc run 795 """
796 - def __init__(self,opts,cp,dir='',tag_base=''):
797 """ 798 """ 799 self.__executable = string.strip(cp.get('fu-condor','plotmcmc')) 800 self.name = os.path.split(self.__executable.rstrip('/'))[1] 801 self.__universe = "vanilla" 802 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 803 self.add_condor_cmd('getenv','True') 804 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
805 806 ############################################################################## 807 # jobs class for setting a the plotting of spinmcmc results 808
809 -class plotspinmcmcJob(pipeline.CondorDAGJob, FUJob):
810 """ 811 A job to set up a plotspinmcmc run 812 """
813 - def __init__(self,opts,cp,dir='',tag_base=''):
814 """ 815 """ 816 self.__executable = string.strip(cp.get('fu-condor','plotspinmcmc')) 817 self.name = os.path.split(self.__executable.rstrip('/'))[1] 818 self.__universe = "vanilla" 819 pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable) 820 self.add_condor_cmd('getenv','True') 821 self.setupJob(name=self.name,dir=dir,cp=cp,tag_base=tag_base)
822 823 824 ############################################################################# 825 ###### CONDOR NODE CLASSES ################################################## 826 ############################################################################# 827
828 -class FUNode:
829 """ 830 """ 831
832 - def __init__(self):
833 self.invalidate()
834
835 - def validate(self):
836 self.validNode = True
837
838 - def invalidate(self):
839 self.validNode = False
840
841 - def __conditionalLoadDefaults__(self,defaults,cp):
842 if not(cp.has_section(defaults["section"])): 843 cp.add_section(defaults["section"]) 844 for key, val in defaults["options"].iteritems(): 845 if not cp.has_option(defaults["section"], key): 846 cp.set(defaults["section"], key, val)
847
848 - def setupPlotNode(self, job):
849 self.add_var_opt("output-path",job.outputPath) 850 self.add_var_opt("enable-output","")
851 852 # QSCAN NODE
853 -class fuQscanNode(pipeline.CondorDAGNode,FUNode):
854 """ 855 QScan node. This node writes its output to the web directory specified in 856 the inifile + the ifo and gps time. For example: 857 858 /archive/home/channa/public_html/followup/htQscan/H1/999999999.999 859 860 The omega scan command line is 861 862 wpipeline scan -r -c H1_hoft.txt -f H-H1_RDS_C03_L2-870946612-870946742.qcache -o QSCAN/foreground-hoft-qscan/H1/870946677.52929688 870946677.52929688 863 864 """
865 - def __init__(self, dag, job, cp, opts, time, ifo, frame_cache, p_nodes=[], type="ht", variety="fg"):
866 """ 867 """ 868 pipeline.CondorDAGNode.__init__(self,job) 869 870 self.scan_type = variety.upper() + "_" + type.replace("seismic","seis").upper() 871 self.scan_ifo = ifo 872 preString="omega/"+ifo+"/%s/"+science_run(time).upper()+"" 873 if variety == "bg": 874 self.add_var_arg('scan') 875 preString = preString%("background") 876 oldPreString="omega/" + science_run(time).upper() + "/background" 877 else: 878 self.add_var_arg('scan -r') 879 preString = preString%("foreground") 880 oldPreString="omega/" + science_run(time).upper() + "/foreground" 881 config = self.fix_config_for_science_run( cp.get('fu-'+variety+'-'+type+'-qscan', ifo+'config').strip(), time ) 882 if cp.get('fu-'+variety+'-'+type+'-qscan', ifo+'config').strip() != config: 883 cp.set('fu-'+variety+'-'+type+'-qscan',ifo+'config',config) 884 self.add_var_arg("-c " + config ) 885 886 if type == "ht": 887 dataString = figure_out_type(time, ifo, 'hoft')[0] 888 else: 889 dataString = figure_out_type(time, ifo, 'rds')[0] 890 if type == "seismic": 891 dataString = dataString + "_SEIS" 892 if dataString[:2]!=ifo: 893 dataString = ifo + "_" + dataString 894 timeString = "-".join(get_day_boundaries(int(time))) 895 if cp.has_option('fu-output','output-dir') and cp.get('fu-output','output-dir'): 896 output = cp.get('fu-output','output-dir') + '/' + preString + '/' + dataString + '/' + timeString 897 else: 898 output = os.getcwd() + '/' + preString + '/' + dataString + '/' + timeString 899 900 # CREATE AND MAKE SURE WE CAN WRITE TO THE OUTPUT DIRECTORY 901 mkdir(output) 902 903 output_path = output+"/"+str(time) 904 self.add_var_arg("-o " + output_path) 905 906 self.output_cache = lal.CacheEntry(ifo, job.name.upper(), segments.segment(float(time), float(time)), "file://localhost/"+output_path) 907 # ADD FRAME CACHE FILE 908 self.add_var_arg("-f "+frame_cache) 909 910 self.add_var_arg(repr(time)) 911 912 self.set_pre_script( "checkForDir.sh %s %s" %(output, str(time)) ) 913 #FIXME is deleting the lock file the right thing to do? 914 self.set_post_script( "rmLock.sh %s/%s/lock.txt" %(output, str(time)) ) 915 916 if not opts.disable_dag_categories: 917 self.set_category(job.name.lower()) 918 919 if not(cp.has_option('fu-remote-jobs','remote-jobs') and job.name in cp.get('fu-remote-jobs','remote-jobs') and cp.has_option('fu-remote-jobs','remote-ifos') and ifo in cp.get('fu-remote-jobs','remote-ifos')): 920 for node in p_nodes: 921 if node.validNode: 922 self.add_parent(node) 923 if not (type=="ht" and opts.no_ht_qscan) and not (type=="rds" and opts.no_rds_qscan) and not (type=="seismic" and opts.no_seismic_qscan): 924 dag.add_node(self) 925 self.validate() 926 else: 927 self.invalidate() 928 else: 929 self.invalidate()
930
931 - def fix_config_for_science_run(self, config, time):
932 run = science_run(time) 933 config_path = os.path.split(config) 934 out = "/".join([config_path[0], config_path[1].replace('s5',run).replace('s6',run)]) 935 return out
936 937 # SETUP PROXY FOR REMOTE SCANS
938 -class setupProxyNode(pipeline.CondorDAGNode,FUNode):
939
940 - def __init__(self, dag, job, cp, opts):
941 942 pipeline.CondorDAGNode.__init__(self,job) 943 dag.add_node(self) 944 self.validate()
945 946 # DISTRIBUTE REMOTE QSCAN RESULTS
947 -class distribRemoteQscanNode(pipeline.CondorDAGNode,FUNode):
948
949 - def __init__(self, dag, job, cp, opts, ifo, time, p_nodes=[], type=""):
950 951 pipeline.CondorDAGNode.__init__(self,job) 952 self.scan_type = type.replace("seismic","seis").upper() 953 self.scan_ifo = ifo 954 # WARNING: First element in p_nodes list is assumed to be the omega scan node 955 self.add_var_arg(p_nodes[0].name_output_file) 956 self.add_var_arg(p_nodes[0].output_path) 957 self.add_var_arg(str(time)) 958 # WARNING: Second element in p_nodes list is assumed to be the datafind node 959 self.add_var_arg(p_nodes[1].localFileName) 960 961 for node in p_nodes: 962 if node.validNode: 963 self.add_parent(node) 964 dag.add_node(self) 965 self.validate()
966 967 # REMOTE DATAFIND NODE
968 -class remoteDatafindNode(pipeline.CondorDAGNode,FUNode):
969
970 - def __init__(self, dag, job, cp, opts, ifo, time, data_type="rds", p_nodes=[]):
971 pipeline.CondorDAGNode.__init__(self,job) 972 973 type, channel = figure_out_type(time,ifo,data_type) 974 q_time = float(cp.get("fu-q-"+data_type+"-datafind",ifo+"-search-time-range"))/2. 975 start_time = int( time - q_time - 1.) 976 end_time = int( time + q_time + 1.) 977 outputFileName = ifo[0]+'-'+type+'-'+str(start_time)+'-'+str(end_time)+'.qcache' 978 979 # THIS IS THE DIRECTORY WHERE THE DATA WILL ULTIMATELY BE COPIED ONCE THE DATAPRODUCT ARE SENT BACK LOCALLY 980 #self.output_cache = lal.CacheEntry(ifo, job.name.upper(), segments.segment(start_time, end_time), "file://localhost/"+job.outputPath+'/'+os.path.abspath(outputFileName)) 981 982 self.add_var_arg(ifo[0]) 983 self.add_var_arg(str(start_time)) 984 self.add_var_arg(str(end_time)) 985 self.add_var_arg(type) 986 self.add_var_arg(outputFileName.replace('qcache','cache')) 987 988 self.name_output_file = job.dir + "-" + job.tag_base + "-" + outputFileName 989 self.add_macro("macrooutputfile", self.name_output_file) 990 991 if not opts.disable_dag_categories: 992 self.set_category(job.name.lower()) 993 for node in p_nodes: 994 if node.validNode: 995 self.add_parent(node) 996 dag.add_node(self) 997 self.validate()
998 999 1000 # REMOTE QSCAN NODE
1001 -class fuRemoteQscanNode(pipeline.CondorDAGNode,FUNode):
1002 """ 1003 """
1004 - def __init__(self, dag, job, cp, opts, time, ifo, p_nodes=[], type="ht", variety="fg"):
1005 1006 pipeline.CondorDAGNode.__init__(self,job) 1007 1008 self.scan_type = variety.upper() + "_" + type.replace("seismic","seis").upper() 1009 self.scan_ifo = ifo 1010 preString="omega/"+ifo+"/%s/"+science_run(time).upper()+"" 1011 if variety == "bg": 1012 preString = preString%("background") 1013 else: 1014 preString = preString%("foreground") 1015 config = cp.get('fu-'+variety+'-'+type+'-qscan', ifo+'config').strip() 1016 self.add_var_arg( config ) 1017 1018 if type == "ht": 1019 dataString = figure_out_type(time, ifo, 'hoft')[0] 1020 else: 1021 dataString = figure_out_type(time, ifo, 'rds')[0] 1022 if type == "seismic": 1023 dataString = dataString + "_SEIS" 1024 if dataString[:2]!=ifo: 1025 dataString = ifo + "_" + dataString 1026 timeString = "-".join(get_day_boundaries(int(time))) 1027 if cp.has_option('fu-output','output-dir') and cp.get('fu-output','output-dir'): 1028 output = cp.get('fu-output','output-dir') + '/' + preString + '/' + dataString + '/' + timeString 1029 else: 1030 output = os.getcwd() + '/' + preString + '/' + dataString + '/' + timeString 1031 1032 # CREATE AND MAKE SURE WE CAN WRITE TO THE OUTPUT DIRECTORY 1033 mkdir(output) 1034 1035 # THIS IS THE DIRECTORY WHERE THE DATA WILL ULTIMATELY BE COPIED ONCE THE DATAPRODUCT ARE SENT BACK LOCALLY 1036 self.output_path = output 1037 1038 self.output_cache = lal.CacheEntry(ifo, job.name.replace("remoteScan_"+job.dir+"_"+job.tag_base+".sh","wpipeline").upper(), segments.segment(float(time), float(time)), "file://localhost/"+self.output_path+"/"+str(time)) 1039 1040 # ADD FRAME CACHE FILE 1041 #self.add_var_arg("/storage/gpfs_virgo3/virgo/omega/cbc/S6/foreground/RAW/V-raw-930000000-947260815.qcache") 1042 1043 # The first parent node must be the cache file! 1044 input_cache_file = p_nodes[0].localFileName 1045 self.add_var_arg(input_cache_file) 1046 self.add_macro("macroinputfile", input_cache_file) 1047 1048 # NOW WE NEED TO SET UP THE REMOTE OUTPUTPATH 1049 1050 # String used in the naming of the omega scan directory 1051 self.add_var_arg(str(time)) 1052 1053 # Time at which the omega scan is performed 1054 self.add_var_arg(repr(time)) 1055 1056 self.name_output_file = job.dir + "-" + job.tag_base + "-" + repr(time) + ".tgz" 1057 self.add_macro("macrooutputfile", self.name_output_file) 1058 1059 if not opts.disable_dag_categories: 1060 self.set_category(job.name.lower()) 1061 1062 for node in p_nodes: 1063 if node.validNode: 1064 self.add_parent(node) 1065 dag.add_node(self) 1066 self.validate()
1067 1068 1069 # ANALYSEQSCAN NODE
1070 -class analyseQscanNode(pipeline.CondorDAGNode,FUNode):
1071
1072 - def __init__(self, dag, job, cp, opts, time, ifo):
1073 1074 pipeline.CondorDAGNode.__init__(self,job) 1075 1076 name = job.name 1077 1078 if "SEIS" in name: 1079 data_type = "rds" 1080 shortName = "seis_rds" 1081 elif "HT" in name: 1082 data_type = "hoft" 1083 shortName = "ht" 1084 else: 1085 data_type = "rds" 1086 shortName = "rds" 1087 1088 refChannel = figure_out_type(time, ifo, data_type)[1].split(":")[-1] 1089 self.add_var_opt('ref-channel',refChannel) 1090 if cp.has_option('fu-analyse-qscan','generate-qscan-xml'): 1091 self.add_var_opt('generate-qscan-xml','') 1092 self.add_var_opt('z-threshold',cp.getfloat('fu-analyse-qscan','z-threshold')) 1093 if cp.has_option('fu-analyse-qscan','plot-z-distribution'): 1094 self.add_var_opt('plot-z-distribution','') 1095 self.add_var_opt('z-min',cp.getfloat('fu-analyse-qscan','z-min')) 1096 self.add_var_opt('z-max',cp.getfloat('fu-analyse-qscan','z-max')) 1097 self.add_var_opt('z-bins',cp.getfloat('fu-analyse-qscan','z-bins')) 1098 if cp.has_option('fu-analyse-qscan','plot-dt-distribution'): 1099 self.add_var_opt('plot-dt-distribution','') 1100 self.add_var_opt('dt-min',cp.getfloat('fu-analyse-qscan',shortName.replace('_','-') + '-dt-min')) 1101 self.add_var_opt('dt-max',cp.getfloat('fu-analyse-qscan',shortName.replace('_','-') + '-dt-max')) 1102 self.add_var_opt('dt-bins',cp.getfloat('fu-analyse-qscan','dt-bins')) 1103 if cp.has_option('fu-analyse-qscan','plot-z-scattered'): 1104 self.add_var_opt('plot-z-scattered','') 1105 if cp.has_option('fu-analyse-qscan','plot-z-scattered') or cp.has_option('fu-analyse-qscan','plot-dt-distribution'): 1106 self.add_var_opt('ref-channel',refChannel) 1107 self.add_var_opt('ifo-times',ifo) 1108 self.add_var_opt('type',name.upper().replace("ANALYSEQSCAN.PY","WPIPELINE")) 1109 self.add_var_opt('short-type',job.name_for_background.upper().replace("ANALYSEQSCAN.PY","WPIPELINE")+'_') 1110 self.add_var_opt('gps-string',str(time)) 1111 self.add_var_opt('ifo-tag',ifo) 1112 self.add_var_opt('user-tag',str(time).replace('.','_') + "_" + shortName) 1113 1114 self.add_var_opt('qscan-cache-foreground',dag.basename+'.cache') 1115 1116 if cp.has_option('fu-analyse-qscan',ifo+'-background-cache'): 1117 backgroundCache = cp.get('fu-analyse-qscan',ifo+'-background-cache').strip() 1118 else: 1119 backgroundCache = figure_out_cache(time,ifo) 1120 cp.set('fu-analyse-qscan',ifo+'-background-cache',backgroundCache) 1121 self.add_var_opt('qscan-cache-background',backgroundCache) 1122 1123 self.output_file_name = "%s-analyseQscan_%s_%s-unspecified-gpstime.cache" % ( ifo, ifo, str(time).replace('.','_') + "_" + shortName) 1124 self.output_cache = lal.CacheEntry(ifo,job.name.upper(),segments.segment(float(time),float(time)),"file://localhost/"+job.outputPath+'/'+self.output_file_name) 1125 1126 self.setupPlotNode(job) 1127 1128 if not opts.disable_dag_categories: 1129 self.set_category(job.name.lower()) 1130 1131 # add the parents to this node 1132 for node in dag.get_nodes(): 1133 # if node distributeQscanNode is valid and if remote 1134 # ifo is analysed, add distributeQscanNode as parent 1135 #if isinstance(node,distributeQscanNode): 1136 # if node.validNode: 1137 # self.add_parent(node) 1138 # add all qscan nodes of the same type as parents 1139 if isinstance(node,fuQscanNode) or isinstance(node,distribRemoteQscanNode): 1140 if node.validNode: 1141 if (node.scan_type in name and node.scan_ifo == ifo): 1142 self.add_parent(node) 1143 if not (shortName=="ht" and opts.no_ht_analyseQscan) and not (shortName == "rds" and opts.no_rds_analyseQscan) and not (shortName == "seis_rds" and opts.no_seismic_analyseQscan) and backgroundCache: 1144 dag.add_node(self) 1145 self.validate() 1146 else: 1147 self.invalidate()
1148 1149 1150 # DATAFIND NODE
1151 -class fuDataFindNode(pipeline.LSCDataFindNode,FUNode):
1152
1153 - def __init__(self, dag, job, cp, opts, ifo, sngl=None, qscan=False, trigger_time=None, data_type="hoft", p_nodes=[]):
1154 1155 self.outputFileName = "" 1156 pipeline.LSCDataFindNode.__init__(self,job) 1157 if qscan: 1158 if sngl: time = sngl.time 1159 else: time = trigger_time 1160 self.outputFileName = self.setup_qscan(job, cp, time, ifo, data_type) 1161 else: 1162 if not sngl: 1163 print >> sys.stderr, "argument \"sngl\" should be provided to class fuDataFindNode" 1164 sys.exit(1) 1165 self.outputFileName = self.setup_inspiral(job, cp, sngl, ifo) 1166 1167 self.output_cache = lal.CacheEntry(ifo, job.name.upper(), segments.segment(self.get_start(), self.get_end()), "file://localhost/"+os.path.abspath(self.outputFileName)) 1168 1169 if not opts.disable_dag_categories: 1170 self.set_category(job.name.lower()) 1171 1172 if not(cp.has_option('fu-remote-jobs','remote-jobs') and job.name in cp.get('fu-remote-jobs','remote-jobs') and cp.has_option('fu-remote-jobs','remote-ifos') and ifo in cp.get('fu-remote-jobs','remote-ifos')) or opts.do_remoteScans: 1173 for node in p_nodes: 1174 if node.validNode: 1175 self.add_parent(node) 1176 if not (data_type=="hoft" and not qscan and opts.no_insp_datafind) and not (data_type=="hoft" and qscan and opts.no_htQscan_datafind) and not (data_type=="rds" and opts.no_rdsQscan_datafind): 1177 dag.add_node(self) 1178 self.validate() 1179 else: 1180 self.invalidate() 1181 else: 1182 self.invalidate
1183
1184 - def setup_qscan(self, job, cp, time, ifo, data_type):
1185 # 1s is substracted to the expected startTime to make sure the window 1186 # will be large enough. This is to be sure to handle the rouding to the 1187 # next sample done by qscan. 1188 type, channel = figure_out_type(time,ifo,data_type) 1189 self.set_type(type) 1190 self.q_time = float(cp.get("fu-q-"+data_type+"-datafind",ifo+"-search-time-range"))/2. 1191 self.set_observatory(ifo[0]) 1192 self.set_start(int( time - self.q_time - 1.)) 1193 self.set_end(int( time + self.q_time + 1.)) 1194 lalCache = self.get_output() 1195 qCache = lalCache.rstrip("lcf") + "qcache" 1196 1197 if cp.has_option('fu-remote-jobs','remote-jobs') and job.name in cp.get('fu-remote-jobs','remote-jobs') and cp.has_option('fu-remote-jobs','remote-ifos') and ifo in cp.get('fu-remote-jobs','remote-ifos'): 1198 self.add_var_arg('--server ldr-bologna.phys.uwm.edu') 1199 postScriptTest = "y" 1200 self.localFileName = os.path.basename(qCache) 1201 else: 1202 self.add_var_arg('') 1203 postScriptTest = "n" 1204 1205 self.set_post_script(os.getcwd()+"/cacheconv.sh %s %s %s" %(lalCache,qCache,postScriptTest) ) 1206 return(qCache)
1207
1208 - def setup_inspiral(self, job, cp, sngl, ifo):
1209 # 1s is substracted to the expected startTime to make sure the window 1210 # will be large enough. This is to be sure to handle the rouding to the 1211 # next sample done by qscan. 1212 type, channel = figure_out_type(sngl.get_gps_start_time(),ifo) 1213 self.set_type(type) 1214 self.set_observatory(ifo[0]) 1215 #FIXME use proper pad, not hardcode to 64 1216 self.set_start(sngl.get_gps_start_time()-64) 1217 self.set_end(sngl.get_gps_end_time()+64) 1218 self.add_var_arg('') 1219 lalCache = self.get_output() 1220 return(lalCache)
1221 1222 1223 # INSPIRAL NODE
1224 -class followUpInspNode(inspiral.InspiralNode,FUNode):
1225 1226 #def __init__(self, inspJob, procParams, ifo, trig, cp,opts,dag, datafindCache, d_node, datafindCommand, type='plot', sngl_table = None):
1227 - def __init__(self, dag, job, cp, opts, sngl, frame_cache, chia, tag, p_nodes=[]):
1228 1229 tlen = 1.0 1230 self.output_file_name = "" 1231 pipeline.CondorDAGNode.__init__(self,job) 1232 pipeline.AnalysisNode.__init__(self) 1233 1234 #FIXME HANDLE INJECTION FILES AND datafind cache 1235 # injFile = self.checkInjections(cp) 1236 # self.set_injections( injFile ) 1237 1238 self.set_trig_start( int(sngl.time - tlen + 0.5) ) 1239 self.set_trig_end( int(sngl.time + tlen + 0.5) ) 1240 if not chia: 1241 self.add_var_opt("write-snrsq","") 1242 self.add_var_opt("write-chisq","") 1243 self.add_var_opt("write-spectrum","") 1244 self.add_var_opt("write-template","") 1245 self.add_var_opt("write-cdata","") 1246 1247 skipParams = ['minimal-match', 'bank-file', 'user-tag', 'injection-file', 'trig-start-time', 'trig-end-time'] 1248 1249 extension = ".xml" 1250 for row in sngl.process_params: 1251 param = row.param.strip("-") 1252 value = row.value 1253 # override some options 1254 if param == 'frame-cache': value = frame_cache 1255 if param == 'snr-threshold': value = "0.1" 1256 if param == 'do-rsq-veto': continue 1257 if param == 'enable-rsq-veto': continue 1258 if param == 'chisq-threshold': value = "1.0e+06" 1259 if param == 'cluster-method': value = 'window' 1260 if param == 'cluster-window': continue 1261 if param == 'userTag': continue 1262 if param == 'user-tag': continue 1263 if param in skipParams: continue 1264 if param == 'channel-name': 1265 self.inputIfo = value[0:2] 1266 #HACK FOR GRB FOLLOWUPS: Channel names defined 1267 #in old GRB runs are obsolete. It is better to 1268 #figure out the channel name from the GPS time. 1269 if opts.do_grb: 1270 type,channel = figure_out_type(sngl.time,self.inputIfo) 1271 value = channel 1272 if param == 'injection-file': value = sngl.inj_file_name 1273 if param == 'gps-end-time': 1274 self.set_end(int(value)) 1275 continue 1276 if param == 'gps-start-time': 1277 self.set_start(int(value)) 1278 continue 1279 if param == 'ifo-tag': 1280 self.set_ifo_tag(value) 1281 continue 1282 self.add_var_opt(param,value) 1283 if param == 'pad-data': 1284 self.set_pad_data(int(value)) 1285 if param == 'write-compress': 1286 extension = '.xml.gz' 1287 1288 self.add_var_opt('cluster-window',str( tlen / 2.)) 1289 self.add_var_opt('disable-rsq-veto',' ') 1290 bankFile = self.write_trig_bank(sngl, 'trig_bank/' + sngl.ifo + '-TRIGBANK_FOLLOWUP_' + repr(sngl.time) + '.xml.gz') 1291 self.set_bank(bankFile) 1292 1293 if chia: 1294 self.set_user_tag( tag.upper() + "_CHIA_FOLLOWUP_" + repr(sngl.time) ) 1295 else: 1296 self.set_user_tag( tag.upper() + "_FOLLOWUP_" + repr(sngl.time) ) 1297 1298 self.output_file_name = job.outputPath + sngl.ifo + "-INSPIRAL_" + self.get_ifo_tag() + "_" + self.get_user_tag() + "-" + str(self.get_start()) + "-" + str(int(self.get_end())-int(self.get_start())) + extension 1299 self.outputCache = sngl.ifo + ' ' + 'INSPIRAL' + ' ' + str(self.get_start()) + ' ' + str(int(self.get_end())-int(self.get_start())) + ' ' + self.output_file_name + '\n' + sngl.ifo + ' ' + 'INSPIRAL-FRAME' + ' ' + str(self.get_start()) + ' ' + str(int(self.get_end())-int(self.get_start())) + ' ' + self.output_file_name.replace(extension,".gwf") + '\n' 1300 1301 self.add_var_opt("output-path",job.outputPath) 1302 self.output_cache = [] 1303 self.output_cache.append(lal.CacheEntry(sngl.ifo, job.name.upper(), segments.segment(float(self.get_start()), float(self.get_end())), "file://localhost/"+self.output_file_name)) 1304 self.output_cache.append(lal.CacheEntry(sngl.ifo, job.name.upper(), segments.segment(float(self.get_start()), float(self.get_end())), "file://localhost/"+self.output_file_name.replace(extension,'.gwf'))) 1305 1306 self.output_frame_file = self.output_file_name.replace(extension,'.gwf') 1307 1308 if not opts.disable_dag_categories: 1309 self.set_category(job.name.lower()) 1310 1311 # # Wed-Aug-25-2010:201008251418 Added Pre & Post 1312 # # scripts depends on value of output-path 1313 # patchScript=create_default_config().which("followup_InspiralDataMover.sh") 1314 # self.set_pre_script("%s %s"%(patchScript,job.outputPath)) 1315 # self.set_post_script("%s %s"%(patchScript,job.outputPath)) 1316 # # End temporary additions Wed-Aug-25-2010:201008251421 1317 # #add parents and put node in dag 1318 for node in p_nodes: 1319 if node.validNode: 1320 self.add_parent(node) 1321 if not opts.no_inspiral: 1322 dag.add_node(self) 1323 self.validate() 1324 else: 1325 self.invalidate()
1326
1327 - def write_trig_bank(self,sngl, name):
1328 try: 1329 os.mkdir('trig_bank') 1330 except: pass 1331 xmldoc = ligolw.Document() 1332 xmldoc.appendChild(ligolw.LIGO_LW()) 1333 1334 process_params_table = lsctables.New(lsctables.ProcessParamsTable) 1335 xmldoc.childNodes[-1].appendChild(process_params_table) 1336 1337 sngl_inspiral_table = lsctables.New(lsctables.SnglInspiralTable) 1338 xmldoc.childNodes[-1].appendChild(sngl_inspiral_table) 1339 sngl_inspiral_table.append(sngl.row) 1340 1341 utils.write_filename(xmldoc, name, verbose=False, gz = True) 1342 return name
1343 1344 # Create checklist wiki files etc node
1345 -class makeCheckListWikiNode(pipeline.CondorDAGNode,FUNode):
1346 """ 1347 This class is responsible for running a final job which will 1348 create the default top 10 triggers for each trigger type. 1349 This will place these files into the publication directory so 1350 user can push wiki content onto the CBC wiki 1351 """
1352 - def __init__(self,dag,job,cp,opts):
1353 pipeline.CondorDAGNode.__init__(self,job) 1354 #Specify pipe location 1355 self.add_var_opt('followup-directory',cp.get("makeCheckListWiki", 1356 "location").strip()) 1357 #Specify pipe ini file 1358 self.add_var_opt('ini-file',cp.get("makeCheckListWiki", 1359 "ini-file").strip()) 1360 if not opts.disable_dag_categories: 1361 self.set_category(job.name.lower()) 1362 #Add this as child of all known jobs 1363 for parentNode in dag.get_nodes(): 1364 if not isinstance(parentNode,makeFollowupPageNode): 1365 self.add_parent(parentNode) 1366 if not opts.no_makeCheckList: 1367 dag.add_node(self) 1368 self.validate() 1369 else: 1370 self.invalidate()
1371 1372 # Create followup page node
1373 -class makeFollowupPageNode(pipeline.CondorDAGNode,FUNode):
1374 """ 1375 This runs the followup page 1376 """
1377 - def __init__(self,dag,job,cp,opts):
1378 pipeline.CondorDAGNode.__init__(self,job) 1379 #FIXME Specify cache location (sortof hard coded) 1380 self.add_var_arg('followup_pipe.cache') 1381 if not opts.disable_dag_categories: 1382 self.set_category(job.name.lower()) 1383 #Add this as child of all known jobs 1384 for parentNode in dag.get_nodes(): 1385 self.add_parent(parentNode) 1386 dag.add_node(self) 1387 self.validate()
1388 1389 # FIND FLAGS NODE
1390 -class findFlagsNode(pipeline.CondorDAGNode,FUNode):
1391 """ 1392 This class is resposible for setting up a node to perform a 1393 query for the DQ flag for the trigger which under review. 1394 EXAMPLE 1395 followupQueryDQ.py --window=60,15 --trigger-time=929052945 --output-format=moinmoin --segment-url="https://segdb.ligo.caltech.edu:30015" --output-file=dqResults.wiki 1396 """ 1397 defaults={"section":"findFlags", 1398 "options":{"window":"60,15", 1399 "segment-url":"https://segdb.ligo.caltech.edu", 1400 "output-format":"moinmoin", 1401 "output-file":"dqResults.wiki", 1402 "estimate-background":"", 1403 "background-location":"automatic"} 1404 }
1405 - def __init__(self, dag, job, cp, opts, coincEvent=None):
1406 """ 1407 """ 1408 self.__conditionalLoadDefaults__(findFlagsNode.defaults,cp) 1409 pipeline.CondorDAGNode.__init__(self,job) 1410 self.add_var_opt("trigger-time",coincEvent.time) 1411 #Output filename 1412 oFilename="%s-findFlags_%s_%s.wiki"%(coincEvent.instruments, 1413 coincEvent.ifos, 1414 coincEvent.time) 1415 self.add_var_opt("output-file",job.outputPath+'/DataProducts/'+oFilename) 1416 self.add_var_opt("segment-url",cp.get('findFlags','segment-url')) 1417 self.add_var_opt("output-format",cp.get('findFlags','output-format')) 1418 self.add_var_opt("window",cp.get('findFlags','window')) 1419 if cp.has_option('findFlags','estimate-background'): 1420 self.add_var_opt("estimate-background",cp.get('findFlags','estimate-background')) 1421 if cp.has_option('findFlags','background-location'): 1422 self.add_var_opt("background-location",cp.get('findFlags','background-location')) 1423 if cp.has_option('findFlags','blind'): 1424 self.add_var_opt("blind",cp.get('findFlags','blind')) 1425 self.output_cache = lal.CacheEntry(coincEvent.ifos, job.name.upper(), segments.segment(float(coincEvent.time), float(coincEvent.time)), "file://localhost/"+job.outputPath+'/DataProducts/'+oFilename) 1426 1427 #IFO arg string 1428 myArgString="" 1429 if hasattr(coincEvent, "sngl_inspiral"): 1430 for sngl in coincEvent.sngl_inspiral.itervalues(): 1431 myArgString=myArgString+"%s,"%sngl.ifo 1432 elif hasattr(coincEvent, "ifos_list"): 1433 for ifo in coincEvent.ifos_list: 1434 myArgString=myArgString+"%s,"%ifo 1435 myArgString=myArgString.rstrip(",") 1436 self.add_var_opt("ifo-list",myArgString) 1437 1438 if not opts.disable_dag_categories: 1439 self.set_category(job.name.lower()) 1440 1441 if not opts.no_findFlags: 1442 dag.add_node(self) 1443 self.validate() 1444 else: 1445 self.invalidate()
1446 1447 # FIND VETOS NODE
1448 -class findVetosNode(pipeline.CondorDAGNode,FUNode):
1449 """ 1450 This class is responsible for creating a node in the dag which 1451 queries the segment database for veto segments active around 1452 the trigger time of the candidate. 1453 Command line example: 1454 followupQueryVeto.py --window=60,15 --trigger-time=929052945 --output-format=moinmoin --segment-url="https://segdb.ligo.caltech.edu:30015" --output-file=vetoResults.wiki 1455 """ 1456 defaults={"section":"findVetoes", 1457 "options":{"window":"60,15", 1458 "segment-url":"https://segdb.ligo.caltech.edu", 1459 "output-format":"moinmoin", 1460 "output-file":"vetoResults.wiki", 1461 "estimate