3 # Copyright (C) 2011 Chad Hanna
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 # Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20 # This program will make create a HTCondor DAG to automate the running of low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe
23 This program makes a dag for a gstlal inspiral low latency pipeline
29 # import standard modules and append the lalapps prefix to the python path
32 import sys, os, copy, stat
36 # import the modules we need to build the pipeline
41 import glue.ligolw.utils as utils
42 from optparse
import OptionParser
43 from gstlal
import inspiral
44 from gstlal
import inspiral_pipe
45 from gstlal
import dagparts
46 from gstlal
import datasource
49 # ### Graph of the HTCondor DAG
51 # - gray boxes are optional and depend on the command line given
59 # node [shape=record fontsize=10 fontname="Verdana"];
60 # edge [fontsize=8 fontname="Verdana"];
61 # gstlal_inspiral [URL="\ref gstlal_inspiral"];
62 # gstlal_llcbcsummary [URL="\ref gstlal_llcbcsummary"];
63 # gstlal_llcbcnode [URL="\ref gstlal_llcbcnode"];
64 # gstlal_inspiral_marginalize_likelihoods_online [URL="\ref gstlal_inspiral_marginalize_likelihoods_online"];
65 # lvalert_listen [style=filled, color=lightgrey, URL="https://www.lsc-group.phys.uwm.edu/daswg/docs/howto/lvalert-howto.html"];
71 # - Typical usage case
73 # ### Command line options
75 # "--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
76 # "--reference-psd", metavar = "filename", help = "Set the reference psd file.")
77 # "--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
78 # "--channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the channel to process (optional). The default is \"LSC-STRAIN\" for all detectors. Override with IFO=CHANNEL-NAME can be given multiple times")
79 # "--dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the DQ channel to process (required).")
80 # "--framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process (required). IFO=ADDR:port can be given multiple times.")
81 # "--framexmit-iface", metavar = "name", default = "10.14.0.1", help = "Set the interface address to process (required). default 10.14.0.1")
82 # "--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process (optional). IFO=CHANNEL-NAME can be given multiple times.")
83 # "--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
84 # "--inj-framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process for the injection stream (required if --inj-channel-name set). IFO=ADDR:port can be given multiple times.")
85 # "--inj-framexmit-iface", metavar = "name", default "10.14.0.1", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set). default 10.14.0.1")
86 # "--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
87 # "--do-iir-pipeline", action = "store_true", help = "run the iir pipeline instead of lloid")
88 # "--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
89 # "--likelihood-cache", help = "set the cache containin likelihood files")
90 # "--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")
91 # "--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
92 # "--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
93 # "--thinca-interval", default = 10, metavar = "secs", help = "set the thinca interval, default 10")
94 # "--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
95 # "--gracedb-search", default = "LowMass", help = "gracedb type, default LowMass")
96 # "--gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal")
97 # "--gracedb-group", default = "Test", help = "gracedb group, default Test")
98 # "--inj-gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent (for injection stream)")
99 # "--inj-gracedb-search", default = "LowMass", help = "gracedb type, default LowMass (for injection stream)")
100 # "--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
101 # "--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
102 # "--data-source", metavar = "[lvshm|]", default = "lvshm", help = "Where to get the data from. Default lvshm")
103 # "--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
104 # "--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
105 # "--state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
106 # "--state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
107 # "--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
108 # "--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
109 # "--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
110 # "--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
111 # "--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
114 class lvalert_listen_job(inspiral_pipe.generic_job):
118 def __init__(self, program, gracedb_group = "CBC", gracedb_search = "LowMass", gracedb_pipeline = "gstlal", progs = ("gstlal_inspiral_lvalert_psd_plotter", "gstlal_inspiral_followups_from_gracedb")):
121 inspiral_pipe.generic_job.__init__(self, program, universe = "local")
123 # produce the lvalert processor
125 f = open("lvalert.sh", "w")
126 f.write("#!/bin/bash \n")
127 f.write('cat <&0 | tee ')
129 f.write(">(%s) " % dagparts.which(prog))
131 os.chmod('lvalert.sh', os.stat('lvalert.sh').st_mode | stat.S_IEXEC)
133 f = open("lvalert.ini", "w")
134 #FIXME gracedb server code sets up nodes based on this convention
135 f.write("[%s_%s_%s]\n" % (gracedb_group.lower(), gracedb_pipeline.lower(), gracedb_search.lower()))
136 f.write("executable=./lvalert.sh")
140 class lvalert_listen_node(pipeline.CondorDAGNode):
144 def __init__(self, job, dag):
146 self.add_var_opt("username", raw_input("lvalert username: "))
147 self.add_var_opt("password", raw_input("lvalert password: "))
148 self.add_var_opt("server", "lvalert.cgca.uwm.edu")
149 self.add_var_opt("config-file", "lvalert.ini")
154 # Parse the command line
158 def parse_command_line():
159 parser = OptionParser(description = __doc__)
160 parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
161 parser.add_option("--reference-psd", metavar = "filename", help = "Set the reference psd file.")
162 parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
163 parser.add_option("--channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the channel to process (optional). The default is \"LSC-STRAIN\" for all detectors. Override with IFO=CHANNEL-NAME can be given multiple times")
164 parser.add_option("--dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the DQ channel to process (required).")
165 parser.add_option("--framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process (required). IFO=ADDR:port can be given multiple times.")
166 parser.add_option("--framexmit-iface", metavar = "name", help = "Set the interface address to process (required).")
167 parser.add_option("--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process (optional). IFO=CHANNEL-NAME can be given multiple times.")
168 parser.add_option("--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
169 parser.add_option("--inj-framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process for the injection stream (required if --inj-channel-name set). IFO=ADDR:port can be given multiple times.")
170 parser.add_option("--inj-framexmit-iface", metavar = "name", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set).")
171 parser.add_option("--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
172 parser.add_option("--do-iir-pipeline", action = "store_true", help = "run the iir pipeline instead of lloid")
173 parser.add_option("--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
174 parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files")
175 parser.add_option("--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")
176 parser.add_option("--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
177 parser.add_option("--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
178 parser.add_option("--thinca-interval", default = 10, metavar = "secs", help = "set the thinca interval, default 10")
179 parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
180 parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb type, default LowMass")
181 parser.add_option("--gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal")
182 parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default Test")
183 parser.add_option("--inj-gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent (for injection stream)")
184 parser.add_option("--inj-gracedb-search", default = "LowMass", help = "gracedb type, default LowMass (for injection stream)")
185 parser.add_option("--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
186 parser.add_option("--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
187 parser.add_option("--data-source", metavar = "[lvshm|]", default = "lvshm", help = "Where to get the data from. Default lvshm")
188 parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
189 parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
190 parser.add_option("--state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
191 parser.add_option("--state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
192 parser.add_option("--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
193 parser.add_option("--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
194 parser.add_option("--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
195 parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
196 parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
198 options, filenames = parser.parse_args()
201 for option in ("bank_cache", "gracedb_far_threshold"):
202 if getattr(options, option) is None:
203 fail += "must provide option %s\n" % (option)
204 if fail: raise ValueError, fail
206 #FIXME add consistency check?
211 framexmit_dict.update(datasource.framexmit_dict_from_framexmit_list(options.framexmit_addr))
216 if inj_channel_dict and not ( set(inj_channel_dict.keys()) == set(channel_dict.keys()) ):
217 raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors")
221 options.likelihood_files = [lal.CacheEntry(line).url for line in open(options.likelihood_cache)]
223 return options, filenames, bankcache, channel_dict, dq_channel_dict, framexmit_dict, inj_channel_dict, inj_dq_channel_dict, inj_framexmit_dict
231 options, filenames, bank_cache, channel_dict, dq_channel_dict, framexmit_dict, inj_channel_dict, inj_dq_channel_dict, inj_framexmit_dict = parse_command_line()
233 try: os.mkdir("logs")
239 # setup the job classes
243 # Figure out if it is iir or not
244 if options.do_iir_pipeline is not None:
245 gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_iir_inspiral', condor_commands = {"want_graceful_removal":"True", "kill_sig":"15", "+Online_CBC_SVD":"True", "Requirements":"(TARGET.Online_CBC_SVD =?= True)"})
247 gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_inspiral', condor_commands = {"want_graceful_removal":"True", "kill_sig":"15", "+Online_CBC_SVD":"True", "Requirements":"(TARGET.Online_CBC_SVD =?= True)"})
249 gstlalInspiralInjJob = inspiral_pipe.generic_job('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = {"want_graceful_removal":"True", "kill_sig":"15", "+Online_CBC_SVD":"True", "Requirements":"(TARGET.Online_CBC_SVD =?= True)"})
250 # A local universe job that will run in a loop marginalizing all of the likelihoods
251 margJob = inspiral_pipe.generic_job('gstlal_inspiral_marginalize_likelihoods_online', universe = "local")
252 # an lvalert_listen job
253 listenJob = lvalert_listen_job("lvalert_listen", gracedb_group = options.gracedb_group, gracedb_search = options.gracedb_search, gracedb_pipeline = options.gracedb_pipeline, progs = options.lvalert_listener_program)
255 urlsJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_get_urls", universe = "local")
259 # loop over banks to run gstlal inspiral pre clustering and far computation
263 listenNode = lvalert_listen_node(listenJob, dag)
268 bank_groups = list(inspiral_pipe.build_bank_groups(bank_cache, [1], options.max_jobs - 1))
269 if len(options.likelihood_files) != len(bank_groups):
270 raise ValueError("Likelihood files must correspond 1:1 with bank files")
272 for num_insp_nodes, (svd_banks, likefile) in enumerate(zip(bank_groups, options.likelihood_files)):
273 svd_bank_string = ",".join([":".join([k, v[0]]) for k,v in svd_banks.items()])
274 jobTags.append("%04d" % num_insp_nodes)
276 inspNode = inspiral_pipe.generic_node(gstlalInspiralJob, dag, [],
277 opts = {"psd-fft-length":options.psd_fft_length,
278 "ht-gate-threshold":options.ht_gate_threshold,
279 "channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
280 "dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
281 "state-vector-on-bits":options.state_vector_on_bits,
282 "state-vector-off-bits":options.state_vector_off_bits,
283 "framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
284 "framexmit-iface":options.framexmit_iface,
285 "svd-bank":svd_bank_string,
286 "tmp-space":inspiral_pipe.condor_scratch_space(),
288 "control-peak-time":options.control_peak_time,
289 "coincidence-threshold":options.coincidence_threshold,
290 "fir-stride":options.fir_stride,
291 "data-source":options.data_source,
292 "gracedb-far-threshold":options.gracedb_far_threshold,
293 "gracedb-group":options.gracedb_group,
294 "gracedb-pipeline":options.gracedb_pipeline,
295 "gracedb-search":options.gracedb_search,
296 "thinca-interval":options.thinca_interval,
297 "job-tag":jobTags[-1],
298 "likelihood-snapshot-interval":options.likelihood_snapshot_interval
300 input_files = {"marginalized-likelihood-file":options.marginalized_likelihood_file},
301 output_files = {"output":"not_used.xml.gz",
302 "likelihood-file":likefile
307 # FIXME The node number for injection jobs currently follows the same
308 # numbering system as non-injection jobs, except instead of starting at
309 # 0000 the numbering starts at 1000. There is probably a better way to
310 # do this in the future, this system was just the simplest to start
312 inj_jobTags.append("%04d" % (num_insp_nodes + 1000))
313 inspInjNode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, [],
314 opts = {"psd-fft-length":options.psd_fft_length,
315 "ht-gate-threshold":options.ht_gate_threshold,
316 "channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_channel_dict),
317 "dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
318 "state-vector-on-bits":options.inj_state_vector_on_bits,
319 "state-vector-off-bits":options.inj_state_vector_off_bits,
320 "framexmit-addr":datasource.framexmit_list_from_framexmit_dict(inj_framexmit_dict),
321 "framexmit-iface":options.inj_framexmit_iface,
322 "svd-bank":svd_bank_string,
323 "tmp-space":inspiral_pipe.condor_scratch_space(),
325 "control-peak-time":options.control_peak_time,
326 "coincidence-threshold":options.coincidence_threshold,
327 "fir-stride":options.fir_stride,
328 "data-source":options.data_source,
329 "gracedb-far-threshold":options.inj_gracedb_far_threshold,
330 "gracedb-group":options.inj_gracedb_group,
331 "gracedb-pipeline":options.inj_gracedb_pipeline,
332 "gracedb-search":options.inj_gracedb_search,
333 "thinca-interval":options.thinca_interval,
334 "job-tag":inj_jobTags[-1],
335 "likelihood-snapshot-interval":options.likelihood_snapshot_interval
337 input_files = {"marginalized-likelihood-file":options.marginalized_likelihood_file,
338 "reference-likelihood-file":[likefile]},
339 output_files = {"output":"not_used.xml.gz",
343 urlsNode = inspiral_pipe.generic_node(urlsJob, dag, [], opts = {}, input_files = {"":" ".join(jobTags + inj_jobTags)}, output_files = {})
344 margNode = inspiral_pipe.generic_node(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {})
348 # Write out the dag and other flies
352 dag.write_sub_files()
353 # we probably want these jobs to retry indefinitely on dedicated nodes. A user
354 # can intervene and fix a problem without having to bring the dag down and up.
355 # There are few enough total jobs that this really shouldn't bog down the
356 # scheduler. For now 10000 will be considered indefinite
357 [node.set_retry(10000) for node in dag.get_nodes()]
364 # set up the webpage cgi scripts
365 # FIXME don't hardcode this stuff
369 shutil.copy2(dagparts.which('gstlal_llcbcsummary'), os.path.expanduser("~/public_html/cgi-bin"))
370 shutil.copy2(dagparts.which('gstlal_llcbcnode'), os.path.expanduser("~/public_html/cgi-bin"))
371 print >>sys.stderr, "\n\n NOTE! You can monitor the analysis at this url: https:
373 print >>sys.stderr, "\n\n NOTE! You can monitor the injection analysis at this url: https: