gstlal-inspiral  0.4.2
 All Classes Namespaces Files Functions Variables Pages
gstlal_iir_inspiral
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2011 Shaun Hooper, Chad Hanna
4 # Copyright (C) 2013-2014 Qi Chu
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License as published by the
8 # Free Software Foundation; either version 2 of the License, or (at your
9 # option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with this program; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 """Stream-based inspiral analysis tool use IIR filters"""
20 
21 ## @file
22 # A version of gstlal_inspiral that uses IIR filter banks
23 #
24 # + `--psd-fft-length` [s] (int): FFT length, default 16s.
25 # + `--veto-segments-file` [filename]: Set the name of the LIGO light-weight XML file from which to load vetoes (optional).
26 # + `--veto-segments-name` [name]: Set the name of the segments to extract from the segment tables and use as the veto list (default = "vetoes").
27 # + `--nxydump-segment` [start:stop]: Set the time interval to dump from nxydump elments (optional). (default is \":\", i.e. dump all time.")
28 # + `--output` [filename]: Set the filename in which to save the triggers (required).
29 # + `--reference-psd` [filename]: Load the spectrum from this LIGO light-weight XML file (required).
30 # + `--track-psd`: Track PSD even if a reference is given
31 # + `--iir-bank` [filename]: Set the name of the LIGO light-weight XML file from which to load the iir template bank (required) format H1:bank1.xml,H2:bank2.xml,L1:bank3.xml,H2:bank4.xml,...
32 # + `--time-slide-file` [filename]: Set the name of the xml file to get time slide offsets.
33 # + `--ht-gate-threshold` [threshold] (float): Set the threshold on whitened h(t) to mark samples as gaps (glitch removal).
34 # + `--chisq-type` [type]: Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.
35 # + `--coincidence-threshold` [value] (float): Set the coincidence window in seconds (default = 0.020). The light-travel time between instruments will be added automatically in the coincidence test.
36 # + `--write-pipeline` [filename]: Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.
37 # + `--comment` [str]
38 # + `--check-time-stamps`: Turn on time stamp checking.
39 # + `--verbose`: Be verbose (optional).
40 # + `--tmp-space` [path]: Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.
41 # + `--blind-injections` [filename]: Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case.
42 # + `--svd-bank` [filename]: Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments.
43 # + `--control-peak-time` [time] (int): Set a time window in seconds to find peaks in the control signal.
44 # + `--fir-stride` [time] (int): Set the length of the fir filter stride in seconds (default = 8).
45 # + `--job-tag`: Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc. required"
46 # + `--likelihood-file` [filename]: Set the name of the likelihood ratio data file to use (optional). If not specified, likelihood ratios will not be assigned to coincs.
47 # + `--marginalized-likelihood-file` [filename]: Set the name of the file from which to load initial marginalized likelihood ratio data (required).
48 # + `--gracedb-far-threshold` (float): False alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent.
49 # + `--likelihood-snapshot-interval` [seconds] (float): How often to reread the marginalized likelihoood data and snapshot the trigger files.
50 # + `--gracedb-search`: gracedb type (default is LowMass).
51 # + `--gracedb-group`: gracedb group (default is Test).
52 # + `--gracedb-pipeline`: gracedb pipeline (default is gstlal-spiir).
53 # + `--thinca-interval` [secs] (float): Set the thinca interval (default = 30s).
54 
55 #
56 # =============================================================================
57 #
58 # Preamble
59 #
60 # =============================================================================
61 #
62 
63 
64 import os
65 import resource
66 import sys
67 from optparse import OptionParser
68 import signal
69 import subprocess
70 import socket
71 import time
72 
73 
74 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
75 import pygtk
76 pygtk.require("2.0")
77 import gobject
78 gobject.threads_init()
79 import pygst
80 pygst.require("0.10")
81 import gst
82 from gstlal import bottle
83 
84 from glue import segments
85 from glue import segmentsUtils
86 from glue.ligolw import ligolw
87 from glue.ligolw import array as ligolw_array
88 from glue.ligolw import param as ligolw_param
89 from glue.ligolw import lsctables
90 from glue.ligolw import utils as ligolw_utils
91 from glue.ligolw.utils import segments as ligolw_segments
92 from pylal.datatypes import LIGOTimeGPS
93 from pylal import series as lalseries
94 from gstlal import far
95 from gstlal import inspiral
96 from gstlal import pipeparts
97 from gstlal import lloidparts
98 from gstlal import spiirparts
99 from gstlal import simplehandler
100 from gstlal import inspiral
101 from gstlal import httpinterface
102 from gstlal import datasource
103 
104 class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
105  pass
106 ligolw_array.use_in(LIGOLWContentHandler)
107 ligolw_param.use_in(LIGOLWContentHandler)
108 lsctables.use_in(LIGOLWContentHandler)
109 
110 def excepthook(*args):
111  # system exception hook that forces hard exit. without this,
112  # exceptions that occur inside python code invoked as a call-back
113  # from the gstreamer pipeline just stop the pipeline, they don't
114  # cause gstreamer to exit.
115 
116  # FIXME: they probably *would* cause if we could figure out why
117  # element errors and the like simply stop the pipeline instead of
118  # crashing it, as well. Perhaps this should be removed when/if the
119  # "element error's don't crash program" problem is fixed
120  sys.__excepthook__(*args)
121  os._exit(1)
122 
123 sys.excepthook = excepthook
124 
125 #
126 # Make sure we have sufficient resources
127 # We allocate far more memory than we need, so this is okay
128 #
129 
130 # set the number of processes up to hard limit
131 maxproc = resource.getrlimit(resource.RLIMIT_NPROC)[1]
132 resource.setrlimit(resource.RLIMIT_NPROC, (maxproc, maxproc))
133 
134 # set the total set size up to hard limit
135 maxas = resource.getrlimit(resource.RLIMIT_AS)[1]
136 resource.setrlimit(resource.RLIMIT_AS, (maxas, maxas))
137 
138 # set the stack size per thread to be smaller
139 maxstack = resource.getrlimit(resource.RLIMIT_STACK)[1]
140 resource.setrlimit(resource.RLIMIT_STACK, (1 * 1024**2, maxstack)) # 1MB per thread, not 10
141 
142 
143 def now():
144  return XLALUTCToGPS(time.gmtime())
145 
146 #
147 # =============================================================================
148 #
149 # Command Line
150 #
151 # =============================================================================
152 #
153 
154 def parse_command_line():
155  parser = OptionParser(
156  description = __doc__
157  )
158 
159  # append all the datasource specific options
160  datasource.append_options(parser)
161 
162  parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
163  parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
164  parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
165  parser.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
166  parser.add_option("--output", metavar = "filename", help = "Set the filename in which to save the triggers (required)")
167  parser.add_option("--reference-psd", metavar = "filename", help = "load the spectrum from this LIGO light-weight XML file (required).")
168  parser.add_option("--track-psd", action = "store_true", help = "Track PSD even if a reference is given")
169  parser.add_option("--iir-bank", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load the iir template bank (required) format H1:bank1.xml,H2:bank2.xml,L1:bank3.xml,H2:bank4.xml,...")
170  parser.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets")
171  parser.add_option("--ht-gate-threshold", metavar = "threshold", type = "float", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal)")
172  parser.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
173  parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.020, help = "Set the coincidence window in seconds (default = 0.020). The light-travel time between instruments will be added automatically in the coincidence test.")
174  parser.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
175  parser.add_option("--comment", metavar="str")
176  parser.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
177  parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
178  parser.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
179  parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case")
180  #FIXME: in order to be compatible with SVD method, the following paramters are kept though not used for iir filtering
181  parser.add_option("--svd-bank", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments.")
182  parser.add_option("--control-peak-time", metavar = "time", type = "int", help = "Set a time window in seconds to find peaks in the control signal")
183  parser.add_option("--fir-stride", metavar = "time", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
184 
185 
186 
187  #FIXME: do not consider online paramters yet
188  parser.add_option("--job-tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc. required")
189  parser.add_option("--likelihood-file", metavar = "filename", help = "Set the name of the likelihood ratio data file to use (optional). If not specified, likelihood ratios will not be assigned to coincs.")
190  parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the name of the file from which to load initial marginalized likelihood ratio data (required).")
191  parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
192  parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
193  parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb search, default is LowMass")
194  parser.add_option("--gracedb-pipeline", default = "gstlal-spiir", help = "gracedb pipeline, default is gstlal-spiir")
195  parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default is Test")
196  parser.add_option("--thinca-interval", metavar = "secs", type = "float", default = 30.0, help = "Set the thinca interval, default = 30s")
197 
198 
199 
200  options, filenames = parser.parse_args()
201 
202  if options.reference_psd is None and not options.track_psd:
203  raise ValueError("must use --track-psd if no reference psd is given, you can use both simultaneously")
204 
205  if options.blind_injections and options.injections:
206  raise ValueError("must use only one of --blind-injections or --injections")
207 
208  required_options = []
209 
210  missing_options = []
211  # FIXME: wield way to be compatible with SVD method
212  if options.svd_bank is not None:
213  options.iir_bank = options.svd_bank
214 
215  if options.iir_bank is None:
216  missing_options += ["--iir-bank"]
217  missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
218  if missing_options:
219  raise ValueError, "missing required option(s) %s" % ", ".join(sorted(missing_options))
220 
221 
222  # parse the datasource specific information and do option checking
223  detectors = datasource.GWDataSourceInfo(options)
224  if len(detectors.channel_dict) < 2:
225  raise ValueError("only coincident searches are supported: must process data from at least two antennae")
226 
227 
228  # Get the banks and make the detectors
229  iir_banks = inspiral.parse_iirbank_string(options.iir_bank)
230 
231  # FIXME: should also check for read permissions
232  required_files = []
233  for instrument in iir_banks:
234  required_files.extend(iir_banks[instrument])
235  if options.veto_segments_file:
236  required_files += [options.veto_segments_file]
237  missing_files = [filename for filename in required_files if not os.path.exists(filename)]
238  if missing_files:
239  raise ValueError, "files %s do not exist" % ", ".join("'%s'" % filename for filename in sorted(missing_files))
240 
241  if options.chisq_type not in ["autochisq", "timeslicechisq"]:
242  raise ValueError, "--chisq-type must be one of (autochisq|timeslicechisq), given %s" % (options.chisq_type)
243 
244  # do this before converting option types
245  process_params = options.__dict__.copy()
246 
247  options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)
248 
249  # Online specific initialization
250  if options.data_source in ("lvshm", "framexmit"):
251  # check required options in this case
252  required_options = ["likelihood_file", "job_tag", "marginalized_likelihood_file"]
253 
254  missing_options += ["--%s" % option.replace("_", "-") for option in required_options if getattr(options, option) is None]
255  if missing_options:
256  raise ValueError, "missing required option(s) %s" % ", ".join(sorted(missing_options))
257 
258  # make an "infinite" extent segment
259  detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))
260 
261  # this gets set so that if you log into a node you can find out what the job id is easily
262  os.environ['GSTLAL_LL_JOB'] = options.job_tag
263 
264  #
265  # Set up a registry of the resources that this job provides
266  #
267 
268  host = socket.gethostname()
269 
270  # FIXME
271  # update this as bottle routes are added, can we do this automatically?
272  fname = os.path.join(os.getcwd(), os.environ['GSTLAL_LL_JOB'] + "_registry.txt")
273  f = open(fname, "w")
274 
275  @bottle.route("/registry.txt")
276  def register(fname = None):
277 
278  yield "# %s %s %s\n" % (options.job_tag, host, " ".join(set(iir_banks.keys())))
279 
280  # First do urls that do not depend on instruments
281  for request in ("registry.txt", "gracedb_far_threshold.txt", "latency_histogram.txt", "latency_history.txt", "snr_history.txt", "ram_history.txt", "likelihood.xml", "bank.txt", "segments.xml"):
282  # FIXME don't hardcode port number
283  yield "http://%s:16953/%s\n" % (host, request)
284 
285  # Then do instrument dependent urls
286  for ifo in set(iir_banks.keys()):
287  for request in ("strain_add_drop.txt", "state_vector_on_off_gap.txt", "psd.txt"):
288  # FIXME don't hardcode port number
289  yield "http://%s:16953/%s/%s\n" % (host, ifo, request)
290 
291  [f.write(l) for l in register(fname)]
292  f.close()
293  else:
294  bad_options = []
295  for option in ["likelihood_file", "job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval"]:
296  if getattr(options, option) is not None:
297  bad_options.append(option)
298  if bad_options:
299  raise ValueError("%s options should only be given for online running" % ",".join(bad_options))
300 
301  #FIXME: job tag and output can not be both none
302 # if options.job_tag is None and options.output is None:
303 # raise ValueError("must provide --job-tag or --output for output file naming purpose")
304 
305  return options, filenames, process_params, iir_banks, detectors
306 
307 
308 #
309 # =============================================================================
310 #
311 # Main
312 #
313 # =============================================================================
314 #
315 
316 
317 #
318 # parse command line
319 #
320 
321 
322 options, filenames, process_params, iir_banks, detectors = parse_command_line()
323 
324 if not options.check_time_stamps:
325  pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src
326 
327 
328 #
329 # Parse the vetos segments file(s) if provided
330 #
331 
332 
333 if options.veto_segments_file is not None:
334  veto_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.veto_segments_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler), options.veto_segments_name).coalesce()
335 else:
336  veto_segments = None
337 
338 
339 #
340 # set up the PSDs
341 #
342 # There are three modes for psds in this program
343 # 1) --reference-psd without --track-psd - a fixed psd (provided by the user) will be used to whiten the data
344 # 2) --track-psd without --reference-psd - a psd will me measured and used on the fly
345 # 3) --track-psd with --reference-psd - a psd will be measured on the fly, but the first "guess will come from the users provided psd
346 #
347 
348 
349 if options.reference_psd is not None:
350  psd = lalseries.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = LIGOLWContentHandler))
351 else:
352  psd = dict((instrument, None) for instrument in detectors.channel_dict)
353 
354 
355 #
356 # Get IIR banks
357 #
358 
359 banks = {}
360 banks = inspiral.parse_iirbank_files(iir_banks, verbose = options.verbose)
361 for instrument in banks:
362  for n, bank in enumerate(banks[instrument]):
363  bank.logname = "%sbank%d" % (instrument,n)
364 @bottle.route("/bank.txt")
365 def get_name(banks = banks):
366  bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones
367  yield '%.14g %.4g ' % (float(now()), bank.template_bank_filename)
368 
369 
370 #
371 # Build pipeline
372 #
373 
374 
375 if options.verbose:
376  print >>sys.stderr, "assembling pipeline ...",
377 
378 pipeline = gst.Pipeline("gstlal_iir_inspiral")
379 mainloop = gobject.MainLoop()
380 
381 triggersrc = spiirparts.mkSPIIRmulti(
382  pipeline,
383  detectors = detectors,
384  banks = banks,
385  psd = psd,
386  psd_fft_length = options.psd_fft_length,
387  ht_gate_threshold = options.ht_gate_threshold,
388  veto_segments = veto_segments,
389  verbose = options.verbose,
390  nxydump_segment = options.nxydump_segment,
391  chisq_type = options.chisq_type,
392  track_psd = options.track_psd,
393  blind_injections = options.blind_injections
394 )
395 
396 
397 if options.verbose:
398  print >>sys.stderr, "done"
399 
400 
401 #
402 # Load likelihood ratio data, assume injections are present
403 #
404 
405 
406 if options.likelihood_file is not None:
407  coinc_params_distributions, ranking_data, seglists = far.parse_likelihood_control_doc(ligolw_utils.load_filename(likelihood_file, verbose = options.verbose, contenthandler = far.ThincaCoincParamsDistributions.LIGOLWContentHandler))
408  assert set(seglists) == set(detectors.channel_dict)
409 else:
410  coinc_params_distributions, ranking_data, seglists = far.ThincaCoincParamsDistributions(), None, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in detectors.channel_dict)
411 
412 
413 #
414 # build output document
415 #
416 
417 
418 if options.verbose:
419  print >>sys.stderr, "initializing output document ..."
420 output = inspiral.Data(
421  filename = options.output or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
422  process_params = process_params,
423  pipeline = pipeline,
424  instruments = set(detectors.channel_dict),
425  seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time
426  coincidence_threshold = options.coincidence_threshold,
427  coinc_params_distributions = coinc_params_distributions,
428  ranking_data = ranking_data,
429  marginalized_likelihood_file = options.marginalized_likelihood_file,
430  likelihood_file = options.likelihood_file,
431  injection_filename = options.injections,
432  time_slide_file = options.time_slide_file,
433  comment = options.comment,
434  tmp_path = options.tmp_space,
435  likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds
436  thinca_interval = options.thinca_interval,
437  gracedb_far_threshold = options.gracedb_far_threshold,
438  gracedb_group = options.gracedb_group,
439  gracedb_search = options.gracedb_search,
440  gracedb_pipeline = options.gracedb_pipeline,
441  verbose = options.verbose
442 )
443 if options.verbose:
444  print >>sys.stderr, "... output document initialized"
445 
446 handler = lloidparts.Handler(mainloop, pipeline, output, instruments = set(detectors.channel_dict), tag = options.job_tag, seglistdict = seglists, verbose = options.verbose)
447 
448 if options.verbose:
449  print >>sys.stderr, "attaching appsinks to pipeline ...",
450 appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer)
451 appsinks = set(appsync.add_sink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs))
452 if options.verbose:
453  print >>sys.stderr, "attached %d, done" % len(appsinks)
454 
455 
456 #
457 # if we request a dot graph of the pipeline, set it up
458 #
459 
460 
461 if options.write_pipeline is not None:
462  pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
463  pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)
464 
465 
466 #
467 # Run pipeline
468 #
469 
470 
471 if options.data_source not in ("lvshm", "framexmit"):
472  if options.verbose:
473  print >>sys.stderr, "setting pipeline state to paused ..."
474  if pipeline.set_state(gst.STATE_PAUSED) != gst.STATE_CHANGE_SUCCESS:
475  raise RuntimeError, "pipeline did not enter paused state"
476  httpservers = None
477 else:
478  #
479  # start http interface
480  #
481 
482  # FIXME: don't hard-code port, don't use in this program right now since more than one job runs per machine
483  httpservers = httpinterface.HTTPServers(16953, verbose = options.verbose)
484  #
485  # setup sigint handler to shutdown pipeline. this is how the program stops
486  # gracefully, it is the only way to stop it. Otherwise it runs forever
487  # man.
488  #
489 
490 
491  class OneTimeSignalHandler(object):
492  def __init__(self, pipeline):
493  self.pipeline = pipeline
494  self.count = 0
495 
496  def __call__(self, signum, frame):
497  self.count += 1
498  if self.count == 1:
499  print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
500  try:
501  #FIXME how do I choose a timestamp?
502  self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp=now().ns()))
503  if not self.pipeline.send_event(gst.event_new_eos()):
504  raise Exception("pipeline.send_event(EOS) returned failure")
505  except Exception, e:
506  print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
507  os._exit(1)
508  else:
509  print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)
510 
511  signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
512  signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))
513  # FIXME get rid of this someday: Repair the shared memory just in case before starting
514  for partition in ("LHO_Data", "LLO_Data", "VIRGO_Data"):
515  subprocess.call(["smrepair", partition])
516 
517 if options.verbose:
518  print >>sys.stderr, "setting pipeline state to playing ..."
519 if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
520  raise RuntimeError, "pipeline did not enter playing state"
521 
522 if options.write_pipeline is not None:
523  pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)
524 
525 if options.verbose:
526  print >>sys.stderr, "running pipeline ..."
527 mainloop.run()
528 
529 
530 #
531 # write output file
532 #
533 
534 
535 output.write_output_file(filename = options.output or output.coincs_document.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), likelihood_file = options.likelihood_file, verbose = options.verbose)
536 
537 
538 #
539 # done
540 #
541 
542 del httpservers
543 if options.data_source in ("lvshm", "framexmit"):
544  sys.exit(1) # online pipeline always ends with an error code
545