3 # Copyright (C) 2011 Shaun Hooper, Chad Hanna
4 # Copyright (C) 2013-2014 Qi Chu
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License as published by the
8 # Free Software Foundation; either version 2 of the License, or (at your
9 # option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
14 # Public License for more details.
16 # You should have received a copy of the GNU General Public License along
17 # with this program; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 """Stream-based inspiral analysis tool use IIR filters"""
22 # A version of gstlal_inspiral that uses IIR filter banks
24 # + `--psd-fft-length` [s] (int): FFT length, default 16s.
25 # + `--veto-segments-file` [filename]: Set the name of the LIGO light-weight XML file from which to load vetoes (optional).
26 # + `--veto-segments-name` [name]: Set the name of the segments to extract from the segment tables and use as the veto list (default = "vetoes").
27 # + `--nxydump-segment` [start:stop]: Set the time interval to dump from nxydump elments (optional). (default is \":\", i.e. dump all time.")
28 # + `--output` [filename]: Set the filename in which to save the triggers (required).
29 # + `--reference-psd` [filename]: Load the spectrum from this LIGO light-weight XML file (required).
30 # + `--track-psd`: Track PSD even if a reference is given
31 # + `--iir-bank` [filename]: Set the name of the LIGO light-weight XML file from which to load the iir template bank (required) format H1:bank1.xml,H2:bank2.xml,L1:bank3.xml,H2:bank4.xml,...
32 # + `--time-slide-file` [filename]: Set the name of the xml file to get time slide offsets.
33 # + `--ht-gate-threshold` [threshold] (float): Set the threshold on whitened h(t) to mark samples as gaps (glitch removal).
34 # + `--chisq-type` [type]: Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.
35 # + `--coincidence-threshold` [value] (float): Set the coincidence window in seconds (default = 0.020). The light-travel time between instruments will be added automatically in the coincidence test.
36 # + `--write-pipeline` [filename]: Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.
38 # + `--check-time-stamps`: Turn on time stamp checking.
39 # + `--verbose`: Be verbose (optional).
40 # + `--tmp-space` [path]: Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.
41 # + `--blind-injections` [filename]: Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case.
42 # + `--svd-bank` [filename]: Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments.
43 # + `--control-peak-time` [time] (int): Set a time window in seconds to find peaks in the control signal.
44 # + `--fir-stride` [time] (int): Set the length of the fir filter stride in seconds (default = 8).
45 # + `--job-tag`: Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc. required"
46 # + `--likelihood-file` [filename]: Set the name of the likelihood ratio data file to use (optional). If not specified, likelihood ratios will not be assigned to coincs.
47 # + `--marginalized-likelihood-file` [filename]: Set the name of the file from which to load initial marginalized likelihood ratio data (required).
48 # + `--gracedb-far-threshold` (float): False alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent.
49 # + `--likelihood-snapshot-interval` [seconds] (float): How often to reread the marginalized likelihoood data and snapshot the trigger files.
50 # + `--gracedb-search`: gracedb type (default is LowMass).
51 # + `--gracedb-group`: gracedb group (default is Test).
52 # + `--gracedb-pipeline`: gracedb pipeline (default is gstlal-spiir).
53 # + `--thinca-interval` [secs] (float): Set the thinca interval (default = 30s).
56 # =============================================================================
60 # =============================================================================
67 from optparse
import OptionParser
74 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
78 gobject.threads_init()
82 from gstlal
import bottle
84 from glue
import segments
85 from glue
import segmentsUtils
86 from glue.ligolw
import ligolw
87 from glue.ligolw
import array as ligolw_array
88 from glue.ligolw
import param as ligolw_param
89 from glue.ligolw
import lsctables
90 from glue.ligolw
import utils as ligolw_utils
91 from glue.ligolw.utils
import segments as ligolw_segments
92 from pylal.datatypes
import LIGOTimeGPS
93 from pylal
import series as lalseries
94 from gstlal
import far
95 from gstlal
import inspiral
96 from gstlal
import pipeparts
97 from gstlal
import lloidparts
98 from gstlal
import spiirparts
99 from gstlal
import simplehandler
100 from gstlal
import inspiral
101 from gstlal
import httpinterface
102 from gstlal
import datasource
104 class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
106 ligolw_array.use_in(LIGOLWContentHandler)
107 ligolw_param.use_in(LIGOLWContentHandler)
108 lsctables.use_in(LIGOLWContentHandler)
110 def excepthook(*args):
111 # system exception hook that forces hard exit. without this,
112 # exceptions that occur inside python code invoked as a call-back
113 # from the gstreamer pipeline just stop the pipeline, they don't
114 # cause gstreamer to exit.
116 # FIXME: they probably *would* cause if we could figure out why
117 # element errors and the like simply stop the pipeline instead of
118 # crashing it, as well. Perhaps this should be removed when/if the
119 # "element error's don't crash program" problem is fixed
120 sys.__excepthook__(*args)
123 sys.excepthook = excepthook
126 # Make sure we have sufficient resources
127 # We allocate far more memory than we need, so this is okay
130 # set the number of processes up to hard limit
131 maxproc = resource.getrlimit(resource.RLIMIT_NPROC)[1]
132 resource.setrlimit(resource.RLIMIT_NPROC, (maxproc, maxproc))
134 # set the total set size up to hard limit
135 maxas = resource.getrlimit(resource.RLIMIT_AS)[1]
136 resource.setrlimit(resource.RLIMIT_AS, (maxas, maxas))
138 # set the stack size per thread to be smaller
139 maxstack = resource.getrlimit(resource.RLIMIT_STACK)[1]
140 resource.setrlimit(resource.RLIMIT_STACK, (1 * 1024**2, maxstack)) # 1MB per thread, not 10
144 return XLALUTCToGPS(time.gmtime())
147 # =============================================================================
151 # =============================================================================
154 def parse_command_line():
155 parser = OptionParser(
156 description = __doc__
159 # append all the datasource specific options
162 parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
163 parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
164 parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
165 parser.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
166 parser.add_option("--output", metavar = "filename", help = "Set the filename in which to save the triggers (required)")
167 parser.add_option("--reference-psd", metavar = "filename", help = "load the spectrum from this LIGO light-weight XML file (required).")
168 parser.add_option("--track-psd", action = "store_true", help = "Track PSD even if a reference is given")
169 parser.add_option("--iir-bank", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load the iir template bank (required) format H1:bank1.xml,H2:bank2.xml,L1:bank3.xml,H2:bank4.xml,...")
170 parser.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets")
171 parser.add_option("--ht-gate-threshold", metavar = "threshold", type = "float", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal)")
172 parser.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
173 parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.020, help = "Set the coincidence window in seconds (default = 0.020). The light-travel time between instruments will be added automatically in the coincidence test.")
174 parser.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
175 parser.add_option("--comment", metavar="str")
176 parser.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
177 parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
178 parser.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
179 parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case")
180 #FIXME: in order to be compatible with SVD method, the following paramters are kept though not used for iir filtering
181 parser.add_option("--svd-bank", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments.")
182 parser.add_option("--control-peak-time", metavar = "time", type = "int", help = "Set a time window in seconds to find peaks in the control signal")
183 parser.add_option("--fir-stride", metavar = "time", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
187 #FIXME: do not consider online paramters yet
188 parser.add_option("--job-tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc. required")
189 parser.add_option("--likelihood-file", metavar = "filename", help = "Set the name of the likelihood ratio data file to use (optional). If not specified, likelihood ratios will not be assigned to coincs.")
190 parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the name of the file from which to load initial marginalized likelihood ratio data (required).")
191 parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
192 parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
193 parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb search, default is LowMass")
194 parser.add_option("--gracedb-pipeline", default = "gstlal-spiir", help = "gracedb pipeline, default is gstlal-spiir")
195 parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default is Test")
196 parser.add_option("--thinca-interval", metavar = "secs", type = "float", default = 30.0, help = "Set the thinca interval, default = 30s")
200 options, filenames = parser.parse_args()
202 if options.reference_psd is None and not options.track_psd:
203 raise ValueError("must use --track-psd if no reference psd is given, you can use both simultaneously")
205 if options.blind_injections and options.injections:
206 raise ValueError("must use only one of --blind-injections or --injections")
208 required_options = []
211 # FIXME: wield way to be compatible with SVD method
212 if options.svd_bank is not None:
213 options.iir_bank = options.svd_bank
215 if options.iir_bank is None:
216 missing_options += ["--iir-bank"]
217 missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
219 raise ValueError, "missing required option(s) %s" % ", ".join(sorted(missing_options))
222 # parse the datasource specific information and do option checking
223 detectors = datasource.GWDataSourceInfo(options)
224 if len(detectors.channel_dict) < 2:
225 raise ValueError("only coincident searches are supported: must process data from at least two antennae")
228 # Get the banks and make the detectors
229 iir_banks = inspiral.parse_iirbank_string(options.iir_bank)
231 # FIXME: should also check for read permissions
233 for instrument in iir_banks:
234 required_files.extend(iir_banks[instrument])
235 if options.veto_segments_file:
236 required_files += [options.veto_segments_file]
237 missing_files = [filename for filename in required_files if not os.path.exists(filename)]
239 raise ValueError, "files %s do not exist" % ", ".join("'%s'" % filename for filename in sorted(missing_files))
241 if options.chisq_type not in ["autochisq", "timeslicechisq"]:
242 raise ValueError, "--chisq-type must be one of (autochisq|timeslicechisq), given %s" % (options.chisq_type)
244 # do this before converting option types
245 process_params = options.__dict__.copy()
247 options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)
249 # Online specific initialization
250 if options.data_source in ("lvshm", "framexmit"):
251 # check required options in this case
252 required_options = ["likelihood_file", "job_tag", "marginalized_likelihood_file"]
254 missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
256 raise ValueError, "missing required option(s) %s" % ", ".join(sorted(missing_options))
258 # make an "infinite" extent segment
259 detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))
261 # this gets set so that if you log into a node you can find out what the job id is easily
262 os.environ['GSTLAL_LL_JOB'] = options.job_tag
265 # Set up a registry of the resources that this job provides
268 host = socket.gethostname()
271 # update this as bottle routes are added, can we do this automatically?
272 fname = os.path.join(os.getcwd(), os.environ['GSTLAL_LL_JOB'] + "_registry.txt")
275 @bottle.route("/registry.txt")
276 def register(fname = None):
278 yield "# %s %s %s\n" % (options.job_tag, host, " ".join(set(iir_banks.keys())))
280 # First do urls that do not depend on instruments
281 for request in ("registry.txt", "gracedb_far_threshold.txt", "latency_histogram.txt", "latency_history.txt", "snr_history.txt", "ram_history.txt", "likelihood.xml", "bank.txt", "segments.xml"):
282 # FIXME don't hardcode port number
285 # Then do instrument dependent urls
286 for ifo in set(iir_banks.keys()):
287 for request in ("strain_add_drop.txt", "state_vector_on_off_gap.txt", "psd.txt"):
288 # FIXME don't hardcode port number
291 [f.write(l) for l in register(fname)]
295 for option in ["likelihood_file", "job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval"]:
296 if getattr(options, option) is not None:
297 bad_options.append(option)
299 raise ValueError("%s options should only be given for online running" % ",".join(bad_options))
301 #FIXME: job tag and output can not be both none
302 # if options.job_tag is None and options.output is None:
303 # raise ValueError("must provide --job-tag or --output for output file naming purpose")
305 return options, filenames, process_params, iir_banks, detectors
309 # =============================================================================
313 # =============================================================================
322 options, filenames, process_params, iir_banks, detectors = parse_command_line()
324 if not options.check_time_stamps:
325 pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src
329 # Parse the vetos segments file(s) if provided
333 if options.veto_segments_file is not None:
334 veto_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.veto_segments_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler), options.veto_segments_name).coalesce()
342 # There are three modes for psds in this program
343 # 1) --reference-psd without --track-psd - a fixed psd (provided by the user) will be used to whiten the data
344 # 2) --track-psd without --reference-psd - a psd will me measured and used on the fly
345 # 3) --track-psd with --reference-psd - a psd will be measured on the fly, but the first "guess will come from the users provided psd
349 if options.reference_psd is not None:
350 psd = lalseries.
read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = LIGOLWContentHandler))
352 psd = dict((instrument, None) for instrument in detectors.channel_dict)
360 banks = inspiral.parse_iirbank_files(iir_banks, verbose = options.verbose)
361 for instrument in banks:
362 for n, bank in enumerate(banks[instrument]):
363 bank.logname =
"%sbank%d" % (instrument,n)
364 @bottle.route(
"/bank.txt")
365 def get_name(banks = banks):
366 bank = banks.values()[0][0] #FIXME maybe shouldn
't just take the first ones
367 yield '%.14g %.4g
' % (float(now()), bank.template_bank_filename)
376 print >>sys.stderr, "assembling pipeline ...",
378 pipeline = gst.Pipeline("gstlal_iir_inspiral")
379 mainloop = gobject.MainLoop()
381 triggersrc = spiirparts.mkSPIIRmulti(
383 detectors = detectors,
386 psd_fft_length = options.psd_fft_length,
387 ht_gate_threshold = options.ht_gate_threshold,
388 veto_segments = veto_segments,
389 verbose = options.verbose,
390 nxydump_segment = options.nxydump_segment,
391 chisq_type = options.chisq_type,
392 track_psd = options.track_psd,
393 blind_injections = options.blind_injections
398 print >>sys.stderr, "done"
402 # Load likelihood ratio data, assume injections are present
406 if options.likelihood_file is not None:
407 coinc_params_distributions, ranking_data, seglists = far.parse_likelihood_control_doc(ligolw_utils.load_filename(likelihood_file, verbose = options.verbose, contenthandler = far.ThincaCoincParamsDistributions.LIGOLWContentHandler))
408 assert set(seglists) == set(detectors.channel_dict)
410 coinc_params_distributions, ranking_data, seglists = far.ThincaCoincParamsDistributions(), None, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in detectors.channel_dict)
414 # build output document
419 print >>sys.stderr, "initializing output document ..."
420 output = inspiral.Data(
421 filename = options.output or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
422 process_params = process_params,
424 instruments = set(detectors.channel_dict),
425 seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time
426 coincidence_threshold = options.coincidence_threshold,
427 coinc_params_distributions = coinc_params_distributions,
428 ranking_data = ranking_data,
429 marginalized_likelihood_file = options.marginalized_likelihood_file,
430 likelihood_file = options.likelihood_file,
431 injection_filename = options.injections,
432 time_slide_file = options.time_slide_file,
433 comment = options.comment,
434 tmp_path = options.tmp_space,
435 likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds
436 thinca_interval = options.thinca_interval,
437 gracedb_far_threshold = options.gracedb_far_threshold,
438 gracedb_group = options.gracedb_group,
439 gracedb_search = options.gracedb_search,
440 gracedb_pipeline = options.gracedb_pipeline,
444 print >>sys.stderr,
"... output document initialized"
449 print >>sys.stderr,
"attaching appsinks to pipeline ...",
450 appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer)
451 appsinks = set(appsync.add_sink(
pipeline, pipeparts.
mkqueue(
pipeline, src), caps = gst.Caps(
"application/x-lal-snglinspiral"), name =
"%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs))
453 print >>sys.stderr,
"attached %d, done" % len(appsinks)
457 # if we request a dot graph of the
pipeline, set it up
461 if options.write_pipeline is not None:
471 if options.data_source not in (
"lvshm",
"framexmit"):
473 print >>sys.stderr,
"setting pipeline state to paused ..."
474 if
pipeline.set_state(gst.STATE_PAUSED) != gst.STATE_CHANGE_SUCCESS:
475 raise RuntimeError,
"pipeline did not enter paused state"
479 # start http interface
482 # FIXME: don
't hard-code port, don't use in this program right now since more than one job runs per machine
483 httpservers = httpinterface.HTTPServers(16953,
verbose = options.
verbose)
485 # setup sigint
handler to shutdown
pipeline. this is how the program stops
486 # gracefully, it is the only way to stop it. Otherwise it runs forever
491 class OneTimeSignalHandler(object):
496 def __call__(self, signum, frame):
499 print >>sys.stderr,
"*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
501 #FIXME how do I choose a timestamp?
502 self.
pipeline.get_bus().post(inspiral.message_new_checkpoint(self.
pipeline, timestamp=now().ns()))
503 if not self.
pipeline.send_event(gst.event_new_eos()):
504 raise Exception(
"pipeline.send_event(EOS) returned failure")
506 print >>sys.stderr,
"graceful shutdown failed: %s\naborting." % str(e)
509 print >>sys.stderr,
"*** received SIG %d %d times... ***" % (signum, self.count)
511 signal.signal(signal.SIGINT, OneTimeSignalHandler(
pipeline))
512 signal.signal(signal.SIGTERM, OneTimeSignalHandler(
pipeline))
513 # FIXME get rid of this someday: Repair the shared memory just in case before starting
514 for partition in (
"LHO_Data",
"LLO_Data",
"VIRGO_Data"):
515 subprocess.call([
"smrepair", partition])
518 print >>sys.stderr,
"setting pipeline state to playing ..."
519 if
pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
520 raise RuntimeError,
"pipeline did not enter playing state"
522 if options.write_pipeline is not None:
526 print >>sys.stderr,
"running pipeline ..."
535 output.write_output_file(filename = options.output or output.coincs_document.
T050017_filename(
"%s_LLOID" % options.job_tag,
"xml.gz"), likelihood_file = options.likelihood_file,
verbose = options.
verbose)
543 if options.data_source in (
"lvshm",
"framexmit"):
544 sys.exit(1) # online
pipeline always ends with an error code