3 # Copyright (C) 2009-2014 Kipp Cannon, Chad Hanna, Drew Keppel
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 # Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 """Stream-based inspiral analysis tool"""
22 # =============================================================================
26 # =============================================================================
29 ## @file gstlal_inspiral
30 # A program to analyze gravitational wave data for compact binary coalescence in real time or in an offline mode
35 # label="gstlal_inspiral"
37 # graph [fontname="Roman", fontsize=24];
38 # edge [ fontname="Roman", fontsize=10 ];
39 # node [fontname="Roman", shape=box, fontsize=11];
41 # gracedb [label="GW\nCandidate\nDatabase", shape=oval, color=tomato3, style=filled];
44 # subgraph clusterNodeN {
47 # label="gstreamer pipeline";
51 # H1src [label="H1 data source:\n mkbasicsrc()", color=red4, URL="\ref pipeparts.mkbasicsrc()"];
52 # L1src [label="L1 data source:\n mkbasicsrc()", color=green4, URL="\ref pipeparts.mkbasicsrc()"];
53 # V1src [label="V1 data source:\n mkbasicsrc()", color=magenta4];
55 # H1multirate [label="H1 whitening and downsampling:\nmkwhitened_multirate_src()", color=red4, URL="\ref multirate_datasource.mkwhitened_multirate_src()"];
56 # L1multirate [label="L1 whitening and downsampling:\nmkwhitened_multirate_src()", color=green4, URL="\ref multirate_datasource.mkwhitened_multirate_src()"];
57 # V1multirate [label="V1 whitening and downsampling:\nmkwhitened_multirate_src()", color=magenta4, URL="\ref multirate_datasource.mkwhitened_multirate_src()"];
59 # H1LLOID [label="H1 LLOID filtering engine:\nmkLLOIDmulti()", color=red4, URL="\ref lloidparts.mkLLOIDmulti()"];
60 # L1LLOID [label="L1 LLOID filtering engine:\nmkLLOIDmulti()", color=green4, URL="\ref lloidparts.mkLLOIDmulti()"];
61 # V1LLOID [label="V1 LLOID filtering engine:\nmkLLOIDmulti()", color=magenta4, URL="\ref lloidparts.mkLLOIDmulti()"];
63 # H1Trig1 [label="H1 Triggering:\nsub bank 1", color=red4];
64 # L1Trig1 [label="L1 Triggering:\nsub bank 1", color=green4];
65 # V1Trig1 [label="V1 Triggering:\nsub bank 1", color=magenta4];
66 # H1Trig2 [label="H1 Triggering:\nsub bank 2", color=red4];
67 # L1Trig2 [label="L1 Triggering:\nsub bank 2", color=green4];
68 # V1Trig2 [label="V1 Triggering:\nsub bank 2", color=magenta4];
69 # H1TrigN [label="H1 Triggering:\nsub bank N", color=red4];
70 # L1TrigN [label="L1 Triggering:\nsub bank N", color=green4];
71 # V1TrigN [label="V1 Triggering:\nsub bank N", color=magenta4];
73 # H1src -> H1multirate;
74 # L1src -> L1multirate;
75 # V1src -> V1multirate;
77 # H1multirate -> H1LLOID [label="h(t) 4096Hz"];
78 # L1multirate -> L1LLOID [label="h(t) 4096Hz"];
79 # V1multirate -> V1LLOID [label="h(t) 4096Hz"];
80 # H1multirate -> H1LLOID [label="h(t) 2048Hz"];
81 # L1multirate -> L1LLOID [label="h(t) 2048Hz"];
82 # V1multirate -> V1LLOID [label="h(t) 2048Hz"];
83 # H1multirate -> H1LLOID [label="h(t) Nth-pow-of-2 Hz"];
84 # L1multirate -> L1LLOID [label="h(t) Nth-pow-of-2 Hz"];
85 # V1multirate -> V1LLOID [label="h(t) Nth-pow-of-2 Hz"];
87 # H1LLOID -> H1Trig1 [label="SNRs sub bank 1"];
88 # L1LLOID -> L1Trig1 [label="SNRs sub bank 1"];
89 # V1LLOID -> V1Trig1 [label="SNRs sub bank 1"];
90 # H1LLOID -> H1Trig2 [label="SNRs sub bank 2"];
91 # L1LLOID -> L1Trig2 [label="SNRs sub bank 2"];
92 # V1LLOID -> V1Trig2 [label="SNRs sub bank 2"];
93 # H1LLOID -> H1TrigN [label="SNRs sub bank N"];
94 # L1LLOID -> L1TrigN [label="SNRs sub bank N"];
95 # V1LLOID -> V1TrigN [label="SNRs sub bank N"];
99 # Coincidence [label="Coincidence\nO(1)s latency"];
100 # SigEst [label="Significance\nEstimation\nO(1)s latency"];
101 # Thresh [label="Thresholding\nO(1)s latency"];
102 # EventGen [label="Event\nGeneration\nO(1)s latency"];
104 # H1Trig1 -> Coincidence [label="Trigs sub bank 1"];
105 # L1Trig1 -> Coincidence [label="Trigs sub bank 1"];
106 # V1Trig1 -> Coincidence [label="Trigs sub bank 1"];
107 # H1Trig2 -> Coincidence [label="Trigs sub bank 2"];
108 # L1Trig2 -> Coincidence [label="Trigs sub bank 2"];
109 # V1Trig2 -> Coincidence [label="Trigs sub bank 2"];
110 # H1TrigN -> Coincidence [label="Trigs sub bank N"];
111 # L1TrigN -> Coincidence [label="Trigs sub bank N"];
112 # V1TrigN -> Coincidence [label="Trigs sub bank N"];
114 # Coincidence -> SigEst -> Thresh -> EventGen;
116 # EventGen -> gracedb;
121 # ### Command line interface
123 # + `--local-frame-caching`
124 # + `--psd-fft-length` [s] (int): FFT length, default 16s.
125 # + `--veto-segments-file` [filename]: Set the name of the LIGO light-weight XML file from which to load vetoes (optional).
126 # + `--veto-segments-name` [name]: Set the name of the segments to extract from the segment tables and use as the veto list, default = "vetoes".
127 # + `--nxydump-segment` [start:stop]: Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time."
128 # + `--output` [filename]: Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).
129 # + `--reference-psd` [filename]: Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).
130 # + `--track-psd`: Track PSD even if a reference is given.
131 # + `--svd-bank` [filename]: Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.
132 # + `--time-slide-file` [filename]: Set the name of the xml file to get time slide offsets.
133 # + `--control-peak-time` [time] (int): Set a time window in seconds to find peaks in the control signal.
134 # + `--fir-stride` [time] (int): Set the length of the fir filter stride in seconds, default = 8.
135 # + `--ht-gate-threshold` [threshold] (float): Set the threshold on whitened h(t) to mark samples as gaps (glitch removal), default = infinity.
136 # + `--chisq-type" [type]: Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.
137 # + `--coincidence-threshold` [value] (float): Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.
138 # + `--write-pipeline` [filename]: Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.
139 # + `--comment`: Set the string to be recorded in comment and tag columns in various places in the output file (optional).
140 # + `--check-time-stamps`: Turn on time stamp checking.
141 # + `--verbose`: Be verbose (optional).
142 # + `--tmp-space` [path]: Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.
143 # + `--blind-injections` [filename]: Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case.
144 # + `--job-tag`: Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc..
145 # + `--likelihood-file` [filename]: Set the name of the likelihood ratio data file to use for ranking events (either --likelihood-file or --reference-likelihood-file must be provided).
146 # + `--reference-likelihood-file` [filename]: Set the name of the likelihood ratio data file to use for ranking events (--data-source must be lvshm or framexmit) (--likelihood-snapshot-interval must provided) (either --likelihood-file or --reference-likelihood-file must be provided).
147 # + `--likelihood-snapshot-interval` [seconds] (float): How often to reread the marginalized likelihoood data. If --likelihood-file is provided, the likelihood file will be overwritten by a snapshot of the trigger files and a duplicate snapshot will be generated to keep a record of past ranking statistics.
148 # + `--marginalized-likelihood-file` [filename]: Set the name of the file from which to load initial marginalized likelihood ratio data (required).
149 # + `--gracedb-far-threshold` (float): False alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent.
150 # + `--gracedb-search`: gracedb type (default is LowMass).
151 # + `--gracedb-pipeline`: gracedb pipeline (default is gstlal).
152 # + `--gracedb-group`: gracedb group (default is Test).
153 # + `--thinca-interval` [secs] (float): Set the thinca interval, default = 4s.
154 # + `--singles-threshold` [SNR] (float): Set the SNR threshold at which to record single-instrument events in the output (default = 8).
158 # | Names | Hash | Date |
159 # | ------------------------------------------- | ------------------------------------------- | ---------- |
160 # | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 7536db9d496be9a014559f4e273e1e856047bf71 | 2014-05-02 |
164 # - Consider cleaning up the nxydump segment option. Currently it only works with modifying source code
165 # - Consider changing the thinca-interval name to thinca-cadence
166 # - consider replacing the 'in ("framexmitsrc", "lvshmsrc")' tests with a property or method in the data source class
167 # - Consider allowing a single detector analysis
168 # - Consider deleting timeslicechisq
174 from optparse
import OptionParser
180 from collections
import namedtuple
182 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
186 gobject.threads_init()
188 pygst.require(
"0.10")
193 from glue
import segments
194 from glue
import segmentsUtils
195 from glue.ligolw
import ligolw
196 from glue.ligolw
import lsctables
197 from glue.ligolw
import utils as ligolw_utils
198 from glue.ligolw.utils
import segments as ligolw_segments
199 from pylal.datatypes
import LIGOTimeGPS
200 from pylal
import series as lalseries
201 from gstlal
import bottle
202 from gstlal
import datasource
203 from gstlal
import lloidparts
204 from gstlal
import far
205 from gstlal
import httpinterface
206 from gstlal
import hoftcache
207 from gstlal
import inspiral
208 from gstlal
import pipeparts
209 from gstlal
import simulation
211 class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
213 lsctables.use_in(LIGOLWContentHandler)
216 # Make sure we have sufficient resources
217 # We allocate far more memory than we need, so this is okay
220 def setrlimit(res, lim):
221 hard_lim = resource.getrlimit(res)[1]
222 resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))
224 # set the number of processes and total set size up to hard limit and
225 # shrink the per-thread stack size (default is 10 MiB)
226 setrlimit(resource.RLIMIT_NPROC, None)
227 setrlimit(resource.RLIMIT_AS, None)
228 setrlimit(resource.RLIMIT_RSS, None)
229 setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread
233 return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
237 # =============================================================================
241 # =============================================================================
245 def parse_command_line():
246 parser = OptionParser(
247 description = __doc__
250 # append all the datasource specific options
253 # local caching to help with I/O for offline running
254 parser.add_option("--local-frame-caching", action = "store_true", help = "blah")
256 parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
257 parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
258 parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
259 parser.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
260 parser.add_option("--output", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).")
261 parser.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).")
262 parser.add_option("--track-psd", action = "store_true", help = "Track PSD even if a reference is given")
263 parser.add_option("--svd-bank", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.")
264 parser.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets")
265 parser.add_option("--control-peak-time", metavar = "time", type = "int", help = "Set a time window in seconds to find peaks in the control signal")
266 parser.add_option("--fir-stride", metavar = "time", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
267 parser.add_option("--ht-gate-threshold", metavar = "threshold", type = "float", default = float("inf"), help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal)")
268 parser.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
269 parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
270 parser.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
271 parser.add_option("--comment", help = "Set the string to be recorded in comment and tag columns in various places in the output file (optional).")
272 parser.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
273 parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
274 parser.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
275 parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case")
279 parser.add_option("--job-tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc..")
280 parser.add_option("--likelihood-file", metavar = "filename", action = "append", help = "Set the name of the likelihood ratio data file to use for ranking events (either --likelihood-file or --reference-likelihood-file must be provided)")
281 parser.add_option("--reference-likelihood-file", metavar = "filename", help = "Set the name of the likelihood ratio data file to use for ranking events (--data-source must be lvshm or framexmit) (--likelihood-snapshot-interval must provided) (either --likelihood-file or --reference-likelihood-file must be provided)")
282 parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data. If --likelihood-file is provided, the likelihood file will be overwritten by a snapshot of the trigger files and a duplicate snapshot will be generated to keep a record of past ranking statistics.")
283 parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the name of the file from which to load initial marginalized likelihood ratio data (required).")
284 parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
285 parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb search, default is LowMass")
286 parser.add_option("--gracedb-pipeline", default = "gstlal", help = "gracedb pipeline, default is gstlal")
287 parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default is Test")
288 parser.add_option("--thinca-interval", metavar = "secs", type = "float", default = 4.0, help = "Set the thinca interval, default = 4s")
289 # NOTE: gstlal_inspiral_calc_likelihood searches for this option
290 # in the process_params table to determine the threshold below
291 # which it can delete uninteresting singles after the coincs are
292 # ranked. if the name of this option is changed, be sure to update
293 # gstlal_inspiral_calc_likelihood
294 parser.add_option("--singles-threshold", metavar = "SNR", type = "float", default = 8.0, help = "Set the SNR threshold at which to record single-instrument events in the output (default = 8).")
296 options, filenames = parser.parse_args()
299 # check for options, files that are always required
302 missing_options = ["--%s" % option.replace("_", "-") for option in ["svd_bank", "output"] if getattr(options, option) is None]
303 if options.likelihood_file is None and options.reference_likelihood_file is None:
304 missing_options.append("either --likelihood-file or --reference-likelihood-file")
307 raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options)))
309 detectors = datasource.GWDataSourceInfo(options)
310 if len(detectors.channel_dict) < 2:
311 raise ValueError("only coincident searches are supported: must process data from at least two antennae")
313 # FIXME Put all svd banks for different detectors in one file.
315 svd_banks = [inspiral.parse_svdbank_string(svdbank) for svdbank in options.svd_bank]
316 except ValueError as e:
317 print "Unable to parse svd banks"
320 # FIXME: should also check for read permissions
322 for svd_bank_set in svd_banks:
323 for instrument in svd_bank_set:
324 required_files.append(svd_bank_set[instrument])
325 if options.veto_segments_file:
326 required_files += [options.veto_segments_file]
327 missing_files = [filename for filename in required_files if not os.path.exists(filename)]
330 raise ValueError("files %s do not exist" % ", ".join("'%s'" % filename for filename in sorted(missing_files)))
332 # check for mutually exclusive options
336 if options.blind_injections and options.injections:
337 bad_combos.append("(--blind-injections, --injections)")
338 if options.likelihood_file and options.reference_likelihood_file:
339 bad_combos.append(("--likelihood-file", "--reference-likelihood-file"))
342 raise ValueError("must use only one option from each set: %s" % ', '.join(bad_combos))
345 # check sanity of options
348 # Online specific initialization
349 # FIXME someday support other online sources
350 if options.data_source in ("lvshm", "framexmit"):
352 for option in ["job_tag", "marginalized_likelihood_file"]:
353 if getattr(options, option) is None:
354 missed_options.append("--%s" %option.replace("_","-"))
357 raise ValueError("%s required for --data-source is lvshm or framexmit" % ", ".join(missed_options))
359 if len(options.svd_bank) > 1:
360 raise ValueError("more than one --svd-bank not allowed for --datasource lvshm or framexmit, %d given" % len(options.likelihood_file))
362 # make an "infinite" extent segment
363 detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))
365 # this gets set so that if you log into a node you can find out what the job id is easily
366 os.environ['GSTLAL_LL_JOB'] = options.job_tag
369 for option in ["job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval", "reference_likelihood_file"]:
370 if getattr(options, option) is not None:
371 bad_options.append("--%s" % option.replace("_","-"))
373 raise ValueError("%s options can only be given for --data-source is lvshm or framexmit " % ", ".join(bad_options))
375 if options.reference_psd is None and not options.track_psd:
376 raise ValueError("must use --track-psd if no reference psd is given, you can use both simultaneously")
377 if options.local_frame_caching and not options.data_source == "frames":
378 raise ValueError('--local-frame-caching can only be used if --data-source = "frames"')
379 if options.chisq_type not in ["autochisq", "timeslicechisq"]:
380 raise ValueError("--chisq-type must be one of (autochisq|timeslicechisq), given %s" % (options.chisq_type))
382 if options.reference_likelihood_file:
383 likelihood_namedtuples_list = [namedtuple('likelihood_namedtuple',('likelihood_file','reference_likelihood_file'))(None, options.reference_likelihood_file)]
385 likelihood_namedtuples_list = [namedtuple('likelihood_namedtuple',('likelihood_file','reference_likelihood_file'))(likelihood_file,None) for likelihood_file in options.likelihood_file]
387 if not (len(options.svd_bank) == len(options.output) == len(likelihood_namedtuples_list)):
388 raise ValueError("must have equal numbers of svd banks, output files and likelihood files")
391 # Option checks complete
394 # do this before converting option types
395 process_params = options.__dict__.copy()
397 options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)
399 if options.blind_injections is not None:
400 detectors.injection_filename = options.blind_injections
402 # Setup local caching
403 if options.local_frame_caching:
404 f, fname = tempfile.mkstemp(".cache")
406 print >> sys.stderr, "caching frame data locally to ", fname
408 # FIXME: should try to down-sample if possible. there are
409 # MDCs data sets floating around whose streams do not start
410 # on integer second boundaries, however, and it's possible
411 # to trigger a failure in the frame muxer if those get
412 # down-sampled so for now we're not doing any resampling.
413 # later, when we don't care about those MDCs, we can go
414 # back to down-sampling. if not being able to down-sample
415 # is a problem in the meantime, I think the best way
416 # forward is to clip the start of said streams using a drop
417 # element and a (to be written) buffer probe that figures
418 # out how many samples to clip off the start of the stream.
419 # FIXME shouldn't use tempfile.gettempdir() directly, use
420 # _CONDOR_SCRATCH_DIR like glue??
421 # FIXME, note that at least for now condor sets TMPDIR to the
422 # run scratch space so this *will* work properly
423 detectors.local_cache_list = hoftcache.cache_hoft(detectors, output_path = tempfile.gettempdir(), verbose = options.verbose)
424 for cacheentry in detectors.local_cache_list:
425 # Guarantee a lal cache complient file with only integer starts and durations.
426 cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) )
427 print >>f, str(cacheentry)
428 detectors.frame_cache = fname
430 # the injections are now present in the data so we don't want to do them twice
431 detectors.injection_filename = None
433 # Choose to optionally reconstruct segments around injections (not blind injections!)
434 if options.injections:
437 reconstruction_segment_list = None
440 return options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list, likelihood_namedtuples_list
444 # =============================================================================
448 # =============================================================================
452 class OneTimeSignalHandler(object):
453 def __init__(self, pipeline):
457 def __call__(self, signum, frame):
460 print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
462 #FIXME how do I choose a timestamp?
463 self.
pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp=now().ns()))
464 if not self.
pipeline.send_event(gst.event_new_eos()):
465 raise Exception("pipeline.send_event(EOS) returned failure")
466 except Exception as e:
467 print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
470 print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)
474 # =============================================================================
478 # =============================================================================
487 options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list, likelihood_namedtuples_list = parse_command_line()
489 if not options.check_time_stamps:
494 # Parse the vetos segments file(s) if provided
498 if options.veto_segments_file is not None:
499 veto_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.veto_segments_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler), options.veto_segments_name).coalesce()
507 # There are three modes for psds in this program
508 # 1) --reference-psd without --track-psd - a fixed psd (provided by the user) will be used to whiten the data
509 # 2) --track-psd without --reference-psd - a psd will me measured and used on the fly
510 # 3) --track-psd with --reference-psd - a psd will be measured on the fly, but the first guess will come from the users provided psd
514 if options.reference_psd is not None:
515 psd = lalseries.
read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lalseries.LIGOLWContentHandler))
517 psd = dict((instrument, None) for instrument in detectors.channel_dict)
521 # Process svd banks in serial
525 for svd_bank, output_filename, likelihood_namedtuple in zip(svd_banks, options.output, likelihood_namedtuples_list):
527 # create a new, empty, Bottle application and make it the current
528 # default, then start http server(s) to serve it up
532 bottle.default_app.push()
533 # uncomment the next line to show tracebacks when something fails
535 #bottle.app().catchall = False
536 httpservers = httpinterface.HTTPServers(0, bottle_app = bottle.default_app(), service_name = "gstlal_inspiral" + (" (%s)" % options.job_tag if options.job_tag is not None else ""), service_properties = {"job_tag": options.job_tag if options.job_tag is not None else ""}, verbose = options.verbose)
540 # Set up a registry of the resources that this job provides
545 @bottle.route(
"/index.html")
546 def index(job_tag = options.job_tag, instruments =
set(svd_bank.keys())):
547 host = socket.gethostname()
548 server_address =
"http://%s:%d" % (host, httpservers[0][0].port)
549 yield
"<html><body>\n<h3>%s %s %s</h3>\n<p>\n" % (job_tag, host,
" ".join(sorted(instruments)))
550 for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
551 if route.rule in (
"/",
"/index.html"):
552 # don
't create links back to this page
554 if route.method != "GET":
555 # only create links for GET methods
557 yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
558 yield "</p>\n</body></html>"
559 # FIXME: get service-discovery working, then don't do this
560 if
"GSTLAL_LL_JOB" in os.environ:
561 open(
"%s_registry.txt" % os.environ[
"GSTLAL_LL_JOB"],
"w").write(
"http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
565 # parse SVD template bank and expose bank metadata
569 banks = inspiral.parse_bank_files(svd_bank,
verbose = options.
verbose)
571 @bottle.route(
"/bank.txt")
572 def get_filter_length_and_chirpmass(banks = banks):
573 bank = banks.values()[0][0] #FIXME maybe shouldn
't just take the first ones
574 yield '%.14g %.4g %.4g
' % (float(now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp)
583 print >>sys.stderr, "assembling pipeline ...",
585 pipeline = gst.Pipeline("gstlal_inspiral")
586 mainloop = gobject.MainLoop()
588 triggersrc = lloidparts.mkLLOIDmulti(
590 detectors = detectors,
593 psd_fft_length = options.psd_fft_length,
594 ht_gate_threshold = options.ht_gate_threshold,
595 veto_segments = veto_segments,
596 verbose = options.verbose,
597 nxydump_segment = options.nxydump_segment,
598 chisq_type = options.chisq_type,
599 track_psd = options.track_psd,
600 control_peak_time = options.control_peak_time,
601 fir_stride = options.fir_stride,
602 reconstruction_segment_list = reconstruction_segment_list
607 print >>sys.stderr, "done"
611 # Load likelihood ratio data, assume injections are present!
615 if options.data_source in ("lvshm", "framexmit"):
616 coinc_params_distributions, ranking_data, seglists = far.parse_likelihood_control_doc(ligolw_utils.load_filename(likelihood_namedtuple[0] if likelihood_namedtuple[0] is not None else likelihood_namedtuple[1], verbose = options.verbose, contenthandler = far.ThincaCoincParamsDistributions.LIGOLWContentHandler))
617 assert set(seglists) == set(detectors.channel_dict)
618 if coinc_params_distributions is None:
619 raise ValueError("\"%s\" does not contain parameter distribution data" % likelihood_namedtuple[0] if likelihood_namedtuple[0] is not None else likelihood_namedtuple[1])
621 coinc_params_distributions, ranking_data, seglists = far.ThincaCoincParamsDistributions(), None, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in detectors.channel_dict)
625 # build output document
630 print >>sys.stderr, "initializing output document ..."
631 output = inspiral.Data(
632 filename = output_filename or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
633 process_params = process_params,
635 instruments = set(detectors.channel_dict),
636 seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time
637 coincidence_threshold = options.coincidence_threshold,
638 coinc_params_distributions = coinc_params_distributions,
639 ranking_data = ranking_data,
640 marginalized_likelihood_file = options.marginalized_likelihood_file,
641 likelihood_files_namedtuple = likelihood_namedtuple,
642 injection_filename = options.injections,
643 time_slide_file = options.time_slide_file,
644 comment = options.comment,
645 tmp_path = options.tmp_space,
646 likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds
647 thinca_interval = options.thinca_interval,
648 sngls_snr_threshold = options.singles_threshold,
649 gracedb_far_threshold = options.gracedb_far_threshold,
650 gracedb_group = options.gracedb_group,
651 gracedb_search = options.gracedb_search,
652 gracedb_pipeline = options.gracedb_pipeline,
656 print >>sys.stderr,
"... output document initialized"
662 print >>sys.stderr,
"attaching appsinks to pipeline ...",
663 appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer)
664 appsinks = set(appsync.add_sink(
pipeline, pipeparts.
mkqueue(
pipeline, src), caps = gst.Caps(
"application/x-lal-snglinspiral"), name =
"%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs))
666 print >>sys.stderr,
"attached %d, done" % len(appsinks)
670 # if we request a dot graph of the
pipeline, set it up
674 if options.write_pipeline is not None:
684 if options.data_source in (
"lvshm",
"framexmit"):
687 # the program stops gracefully, it is the only way to stop
688 # it. Otherwise it runs forever man.
690 # this is only done in the online case because we want
691 # offline jobs to just die and not write partial databases
694 signal.signal(signal.SIGINT, OneTimeSignalHandler(
pipeline))
695 signal.signal(signal.SIGTERM, OneTimeSignalHandler(
pipeline))
699 print >>sys.stderr,
"setting pipeline state to playing ..."
700 if
pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
701 raise RuntimeError(
"pipeline did not enter playing state")
703 if options.write_pipeline is not None:
707 print >>sys.stderr,
"running pipeline ..."
716 output.write_output_file(filename = output_filename or output.coincs_document.
T050017_filename(
"%s_LLOID" % options.job_tag,
"xml.gz"), description =
"%s_LLOID" % options.job_tag,
verbose = options.
verbose)
720 # Cleanup template bank temp files
725 for bank in banks[ifo]:
727 print >> sys.stderr,
"removing file: ", bank.template_bank_filename
728 os.remove(bank.template_bank_filename)
732 # Shutdown the web interface servers and garbage collect the Bottle
733 # app. This should release the references the Bottle app
's routes
734 # hold to the pipeline's data (like template banks and so on).
739 bottle.default_app.pop()
747 if
pipeline.set_state(gst.STATE_NULL) != gst.STATE_CHANGE_SUCCESS:
748 raise RuntimeError(
"pipeline could not be set to NULL")
757 # Cleanup local caches
761 if options.local_frame_caching:
763 print >>sys.stderr,
"deleting temporary cache file ", detectors.frame_cache
764 os.remove(detectors.frame_cache)
765 del detectors.local_cache_list
769 # done. online
pipeline always ends with an error code so that dagman does
770 # not mark the job
"done" and the job will be restarted when the dag is
775 if options.data_source in (
"lvshm",
"framexmit"):