gstlal-inspiral  0.4.2
 All Classes Namespaces Files Functions Variables Pages
gstlal_inspiral
1 #!/usr/bin/env python
2 #
3 # Copyright (C) 2009-2014 Kipp Cannon, Chad Hanna, Drew Keppel
4 #
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 """Stream-based inspiral analysis tool"""
19 
20 
21 #
22 # =============================================================================
23 #
24 # Preamble
25 #
26 # =============================================================================
27 #
28 
29 ## @file gstlal_inspiral
30 # A program to analyze gravitational wave data for compact binary coalescence in real time or in an offline mode
31 #
32 # @dot
33 # digraph llpipe {
34 # labeljust = "r";
35 # label="gstlal_inspiral"
36 # rankdir=LR;
37 # graph [fontname="Roman", fontsize=24];
38 # edge [ fontname="Roman", fontsize=10 ];
39 # node [fontname="Roman", shape=box, fontsize=11];
40 #
41 # gracedb [label="GW\nCandidate\nDatabase", shape=oval, color=tomato3, style=filled];
42 #
43 #
44 # subgraph clusterNodeN {
45 #
46 # style=rounded;
47 # label="gstreamer pipeline";
48 # labeljust = "r";
49 # fontsize = 14;
50 #
51 # H1src [label="H1 data source:\n mkbasicsrc()", color=red4, URL="\ref pipeparts.mkbasicsrc()"];
52 # L1src [label="L1 data source:\n mkbasicsrc()", color=green4, URL="\ref pipeparts.mkbasicsrc()"];
53 # V1src [label="V1 data source:\n mkbasicsrc()", color=magenta4];
54 #
55 # H1multirate [label="H1 whitening and downsampling:\nmkwhitened_multirate_src()", color=red4, URL="\ref multirate_datasource.mkwhitened_multirate_src()"];
56 # L1multirate [label="L1 whitening and downsampling:\nmkwhitened_multirate_src()", color=green4, URL="\ref multirate_datasource.mkwhitened_multirate_src()"];
57 # V1multirate [label="V1 whitening and downsampling:\nmkwhitened_multirate_src()", color=magenta4, URL="\ref multirate_datasource.mkwhitened_multirate_src()"];
58 #
59 # H1LLOID [label="H1 LLOID filtering engine:\nmkLLOIDmulti()", color=red4, URL="\ref lloidparts.mkLLOIDmulti()"];
60 # L1LLOID [label="L1 LLOID filtering engine:\nmkLLOIDmulti()", color=green4, URL="\ref lloidparts.mkLLOIDmulti()"];
61 # V1LLOID [label="V1 LLOID filtering engine:\nmkLLOIDmulti()", color=magenta4, URL="\ref lloidparts.mkLLOIDmulti()"];
62 #
63 # H1Trig1 [label="H1 Triggering:\nsub bank 1", color=red4];
64 # L1Trig1 [label="L1 Triggering:\nsub bank 1", color=green4];
65 # V1Trig1 [label="V1 Triggering:\nsub bank 1", color=magenta4];
66 # H1Trig2 [label="H1 Triggering:\nsub bank 2", color=red4];
67 # L1Trig2 [label="L1 Triggering:\nsub bank 2", color=green4];
68 # V1Trig2 [label="V1 Triggering:\nsub bank 2", color=magenta4];
69 # H1TrigN [label="H1 Triggering:\nsub bank N", color=red4];
70 # L1TrigN [label="L1 Triggering:\nsub bank N", color=green4];
71 # V1TrigN [label="V1 Triggering:\nsub bank N", color=magenta4];
72 #
73 # H1src -> H1multirate;
74 # L1src -> L1multirate;
75 # V1src -> V1multirate;
76 #
77 # H1multirate -> H1LLOID [label="h(t) 4096Hz"];
78 # L1multirate -> L1LLOID [label="h(t) 4096Hz"];
79 # V1multirate -> V1LLOID [label="h(t) 4096Hz"];
80 # H1multirate -> H1LLOID [label="h(t) 2048Hz"];
81 # L1multirate -> L1LLOID [label="h(t) 2048Hz"];
82 # V1multirate -> V1LLOID [label="h(t) 2048Hz"];
83 # H1multirate -> H1LLOID [label="h(t) Nth-pow-of-2 Hz"];
84 # L1multirate -> L1LLOID [label="h(t) Nth-pow-of-2 Hz"];
85 # V1multirate -> V1LLOID [label="h(t) Nth-pow-of-2 Hz"];
86 #
87 # H1LLOID -> H1Trig1 [label="SNRs sub bank 1"];
88 # L1LLOID -> L1Trig1 [label="SNRs sub bank 1"];
89 # V1LLOID -> V1Trig1 [label="SNRs sub bank 1"];
90 # H1LLOID -> H1Trig2 [label="SNRs sub bank 2"];
91 # L1LLOID -> L1Trig2 [label="SNRs sub bank 2"];
92 # V1LLOID -> V1Trig2 [label="SNRs sub bank 2"];
93 # H1LLOID -> H1TrigN [label="SNRs sub bank N"];
94 # L1LLOID -> L1TrigN [label="SNRs sub bank N"];
95 # V1LLOID -> V1TrigN [label="SNRs sub bank N"];
96 # }
97 #
98 #
99 # Coincidence [label="Coincidence\nO(1)s latency"];
100 # SigEst [label="Significance\nEstimation\nO(1)s latency"];
101 # Thresh [label="Thresholding\nO(1)s latency"];
102 # EventGen [label="Event\nGeneration\nO(1)s latency"];
103 #
104 # H1Trig1 -> Coincidence [label="Trigs sub bank 1"];
105 # L1Trig1 -> Coincidence [label="Trigs sub bank 1"];
106 # V1Trig1 -> Coincidence [label="Trigs sub bank 1"];
107 # H1Trig2 -> Coincidence [label="Trigs sub bank 2"];
108 # L1Trig2 -> Coincidence [label="Trigs sub bank 2"];
109 # V1Trig2 -> Coincidence [label="Trigs sub bank 2"];
110 # H1TrigN -> Coincidence [label="Trigs sub bank N"];
111 # L1TrigN -> Coincidence [label="Trigs sub bank N"];
112 # V1TrigN -> Coincidence [label="Trigs sub bank N"];
113 #
114 # Coincidence -> SigEst -> Thresh -> EventGen;
115 #
116 # EventGen -> gracedb;
117 #
118 # }
119 # @enddot
120 #
121 # ### Command line interface
122 #
123 # + `--local-frame-caching`
124 # + `--psd-fft-length` [s] (int): FFT length, default 16s.
125 # + `--veto-segments-file` [filename]: Set the name of the LIGO light-weight XML file from which to load vetoes (optional).
126 # + `--veto-segments-name` [name]: Set the name of the segments to extract from the segment tables and use as the veto list, default = "vetoes".
127 # + `--nxydump-segment` [start:stop]: Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time."
128 # + `--output` [filename]: Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).
129 # + `--reference-psd` [filename]: Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).
130 # + `--track-psd`: Track PSD even if a reference is given.
131 # + `--svd-bank` [filename]: Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.
132 # + `--time-slide-file` [filename]: Set the name of the xml file to get time slide offsets.
133 # + `--control-peak-time` [time] (int): Set a time window in seconds to find peaks in the control signal.
134 # + `--fir-stride` [time] (int): Set the length of the fir filter stride in seconds, default = 8.
135 # + `--ht-gate-threshold` [threshold] (float): Set the threshold on whitened h(t) to mark samples as gaps (glitch removal), default = infinity.
136 # + `--chisq-type" [type]: Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.
137 # + `--coincidence-threshold` [value] (float): Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.
138 # + `--write-pipeline` [filename]: Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.
139 # + `--comment`: Set the string to be recorded in comment and tag columns in various places in the output file (optional).
140 # + `--check-time-stamps`: Turn on time stamp checking.
141 # + `--verbose`: Be verbose (optional).
142 # + `--tmp-space` [path]: Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.
143 # + `--blind-injections` [filename]: Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case.
144 # + `--job-tag`: Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc..
145 # + `--likelihood-file` [filename]: Set the name of the likelihood ratio data file to use for ranking events (either --likelihood-file or --reference-likelihood-file must be provided).
146 # + `--reference-likelihood-file` [filename]: Set the name of the likelihood ratio data file to use for ranking events (--data-source must be lvshm or framexmit) (--likelihood-snapshot-interval must provided) (either --likelihood-file or --reference-likelihood-file must be provided).
147 # + `--likelihood-snapshot-interval` [seconds] (float): How often to reread the marginalized likelihoood data. If --likelihood-file is provided, the likelihood file will be overwritten by a snapshot of the trigger files and a duplicate snapshot will be generated to keep a record of past ranking statistics.
148 # + `--marginalized-likelihood-file` [filename]: Set the name of the file from which to load initial marginalized likelihood ratio data (required).
149 # + `--gracedb-far-threshold` (float): False alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent.
150 # + `--gracedb-search`: gracedb type (default is LowMass).
151 # + `--gracedb-pipeline`: gracedb pipeline (default is gstlal).
152 # + `--gracedb-group`: gracedb group (default is Test).
153 # + `--thinca-interval` [secs] (float): Set the thinca interval, default = 4s.
154 # + `--singles-threshold` [SNR] (float): Set the SNR threshold at which to record single-instrument events in the output (default = 8).
155 #
156 # ### Review Status
157 #
158 # | Names | Hash | Date |
159 # | ------------------------------------------- | ------------------------------------------- | ---------- |
160 # | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 7536db9d496be9a014559f4e273e1e856047bf71 | 2014-05-02 |
161 #
162 # #### Actions
163 #
164 # - Consider cleaning up the nxydump segment option. Currently it only works with modifying source code
165 # - Consider changing the thinca-interval name to thinca-cadence
166 # - consider replacing the 'in ("framexmitsrc", "lvshmsrc")' tests with a property or method in the data source class
167 # - Consider allowing a single detector analysis
168 # - Consider deleting timeslicechisq
169 
170 
171 import os
172 import resource
173 import sys
174 from optparse import OptionParser
175 import signal
176 import socket
177 import time
178 import tempfile
179 import math
180 from collections import namedtuple
181 
182 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
183 import pygtk
184 pygtk.require("2.0")
185 import gobject
186 gobject.threads_init()
187 import pygst
188 pygst.require("0.10")
189 import gst
190 
191 import lal
192 
193 from glue import segments
194 from glue import segmentsUtils
195 from glue.ligolw import ligolw
196 from glue.ligolw import lsctables
197 from glue.ligolw import utils as ligolw_utils
198 from glue.ligolw.utils import segments as ligolw_segments
199 from pylal.datatypes import LIGOTimeGPS
200 from pylal import series as lalseries
201 from gstlal import bottle
202 from gstlal import datasource
203 from gstlal import lloidparts
204 from gstlal import far
205 from gstlal import httpinterface
206 from gstlal import hoftcache
207 from gstlal import inspiral
208 from gstlal import pipeparts
209 from gstlal import simulation
210 
211 class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
212  pass
213 lsctables.use_in(LIGOLWContentHandler)
214 
215 #
216 # Make sure we have sufficient resources
217 # We allocate far more memory than we need, so this is okay
218 #
219 
220 def setrlimit(res, lim):
221  hard_lim = resource.getrlimit(res)[1]
222  resource.setrlimit(res, (lim if lim is not None else hard_lim, hard_lim))
223 
224 # set the number of processes and total set size up to hard limit and
225 # shrink the per-thread stack size (default is 10 MiB)
226 setrlimit(resource.RLIMIT_NPROC, None)
227 setrlimit(resource.RLIMIT_AS, None)
228 setrlimit(resource.RLIMIT_RSS, None)
229 setrlimit(resource.RLIMIT_STACK, 1024 * 1024) # 1 MiB per thread
230 
231 
232 def now():
233  return LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0)
234 
235 
236 #
237 # =============================================================================
238 #
239 # Command Line
240 #
241 # =============================================================================
242 #
243 
244 
245 def parse_command_line():
246  parser = OptionParser(
247  description = __doc__
248  )
249 
250  # append all the datasource specific options
251  datasource.append_options(parser)
252 
253  # local caching to help with I/O for offline running
254  parser.add_option("--local-frame-caching", action = "store_true", help = "blah")
255 
256  parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 16s")
257  parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
258  parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
259  parser.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional). The default is \":\", i.e. dump all time.")
260  parser.add_option("--output", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML output file *.{xml,xml.gz} or an SQLite database *.sqlite (required).")
261  parser.add_option("--reference-psd", metavar = "filename", help = "Instead of measuring the noise spectrum, load the spectrum from this LIGO light-weight XML file (optional).")
262  parser.add_option("--track-psd", action = "store_true", help = "Track PSD even if a reference is given")
263  parser.add_option("--svd-bank", metavar = "filename", action = "append", help = "Set the name of the LIGO light-weight XML file from which to load the svd bank for a given instrument in the form ifo:file, These can be given as a comma separated list such as H1:file1,H2:file2,L1:file3 to analyze multiple instruments. This option can be given multiple times in order to analyze bank serially. At least one svd bank for at least 2 detectors is required.")
264  parser.add_option("--time-slide-file", metavar = "filename", help = "Set the name of the xml file to get time slide offsets")
265  parser.add_option("--control-peak-time", metavar = "time", type = "int", help = "Set a time window in seconds to find peaks in the control signal")
266  parser.add_option("--fir-stride", metavar = "time", type = "int", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
267  parser.add_option("--ht-gate-threshold", metavar = "threshold", type = "float", default = float("inf"), help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal)")
268  parser.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
269  parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
270  parser.add_option("--write-pipeline", metavar = "filename", help = "Write a DOT graph description of the as-built pipeline to this file (optional). The environment variable GST_DEBUG_DUMP_DOT_DIR must be set for this option to work.")
271  parser.add_option("--comment", help = "Set the string to be recorded in comment and tag columns in various places in the output file (optional).")
272  parser.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
273  parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).")
274  parser.add_option("-t", "--tmp-space", metavar = "path", help = "Path to a directory suitable for use as a work area while manipulating the database file. The database file will be worked on in this directory, and then moved to the final location when complete. This option is intended to improve performance when running in a networked environment, where there might be a local disk with higher bandwidth than is available to the filesystem on which the final output will reside.")
275  parser.add_option("--blind-injections", metavar = "filename", help = "Set the name of an injection file that will be added to the data without saving the sim_inspiral_table or otherwise processing the data differently. Has the effect of having hidden signals in the input data. --injections must not be specified in this case")
276 
277  # Online options
278 
279  parser.add_option("--job-tag", help = "Set the string to identify this job and register the resources it provides on a node. Should be 4 digits of the form 0001, 0002, etc..")
280  parser.add_option("--likelihood-file", metavar = "filename", action = "append", help = "Set the name of the likelihood ratio data file to use for ranking events (either --likelihood-file or --reference-likelihood-file must be provided)")
281  parser.add_option("--reference-likelihood-file", metavar = "filename", help = "Set the name of the likelihood ratio data file to use for ranking events (--data-source must be lvshm or framexmit) (--likelihood-snapshot-interval must provided) (either --likelihood-file or --reference-likelihood-file must be provided)")
282  parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data. If --likelihood-file is provided, the likelihood file will be overwritten by a snapshot of the trigger files and a duplicate snapshot will be generated to keep a record of past ranking statistics.")
283  parser.add_option("--marginalized-likelihood-file", metavar = "filename", help = "Set the name of the file from which to load initial marginalized likelihood ratio data (required).")
284  parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
285  parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb search, default is LowMass")
286  parser.add_option("--gracedb-pipeline", default = "gstlal", help = "gracedb pipeline, default is gstlal")
287  parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default is Test")
288  parser.add_option("--thinca-interval", metavar = "secs", type = "float", default = 4.0, help = "Set the thinca interval, default = 4s")
289  # NOTE: gstlal_inspiral_calc_likelihood searches for this option
290  # in the process_params table to determine the threshold below
291  # which it can delete uninteresting singles after the coincs are
292  # ranked. if the name of this option is changed, be sure to update
293  # gstlal_inspiral_calc_likelihood
294  parser.add_option("--singles-threshold", metavar = "SNR", type = "float", default = 8.0, help = "Set the SNR threshold at which to record single-instrument events in the output (default = 8).")
295 
296  options, filenames = parser.parse_args()
297 
298  #
299  # check for options, files that are always required
300  #
301 
302  missing_options = ["--%s" % option.replace("_", "-") for option in ["svd_bank", "output"] if getattr(options, option) is None]
303  if options.likelihood_file is None and options.reference_likelihood_file is None:
304  missing_options.append("either --likelihood-file or --reference-likelihood-file")
305 
306  if missing_options:
307  raise ValueError("missing required option(s) %s" % ", ".join(sorted(missing_options)))
308 
309  detectors = datasource.GWDataSourceInfo(options)
310  if len(detectors.channel_dict) < 2:
311  raise ValueError("only coincident searches are supported: must process data from at least two antennae")
312 
313  # FIXME Put all svd banks for different detectors in one file.
314  try:
315  svd_banks = [inspiral.parse_svdbank_string(svdbank) for svdbank in options.svd_bank]
316  except ValueError as e:
317  print "Unable to parse svd banks"
318  raise
319 
320  # FIXME: should also check for read permissions
321  required_files = []
322  for svd_bank_set in svd_banks:
323  for instrument in svd_bank_set:
324  required_files.append(svd_bank_set[instrument])
325  if options.veto_segments_file:
326  required_files += [options.veto_segments_file]
327  missing_files = [filename for filename in required_files if not os.path.exists(filename)]
328 
329  if missing_files:
330  raise ValueError("files %s do not exist" % ", ".join("'%s'" % filename for filename in sorted(missing_files)))
331  #
332  # check for mutually exclusive options
333  #
334 
335  bad_combos = []
336  if options.blind_injections and options.injections:
337  bad_combos.append("(--blind-injections, --injections)")
338  if options.likelihood_file and options.reference_likelihood_file:
339  bad_combos.append(("--likelihood-file", "--reference-likelihood-file"))
340 
341  if bad_combos:
342  raise ValueError("must use only one option from each set: %s" % ', '.join(bad_combos))
343 
344  #
345  # check sanity of options
346  #
347 
348  # Online specific initialization
349  # FIXME someday support other online sources
350  if options.data_source in ("lvshm", "framexmit"):
351  missed_options = []
352  for option in ["job_tag", "marginalized_likelihood_file"]:
353  if getattr(options, option) is None:
354  missed_options.append("--%s" %option.replace("_","-"))
355 
356  if missed_options:
357  raise ValueError("%s required for --data-source is lvshm or framexmit" % ", ".join(missed_options))
358 
359  if len(options.svd_bank) > 1:
360  raise ValueError("more than one --svd-bank not allowed for --datasource lvshm or framexmit, %d given" % len(options.likelihood_file))
361 
362  # make an "infinite" extent segment
363  detectors.seg = segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000))
364 
365  # this gets set so that if you log into a node you can find out what the job id is easily
366  os.environ['GSTLAL_LL_JOB'] = options.job_tag
367  else:
368  bad_options = []
369  for option in ["job_tag", "marginalized_likelihood_file", "likelihood_snapshot_interval", "reference_likelihood_file"]:
370  if getattr(options, option) is not None:
371  bad_options.append("--%s" % option.replace("_","-"))
372  if bad_options:
373  raise ValueError("%s options can only be given for --data-source is lvshm or framexmit " % ", ".join(bad_options))
374 
375  if options.reference_psd is None and not options.track_psd:
376  raise ValueError("must use --track-psd if no reference psd is given, you can use both simultaneously")
377  if options.local_frame_caching and not options.data_source == "frames":
378  raise ValueError('--local-frame-caching can only be used if --data-source = "frames"')
379  if options.chisq_type not in ["autochisq", "timeslicechisq"]:
380  raise ValueError("--chisq-type must be one of (autochisq|timeslicechisq), given %s" % (options.chisq_type))
381 
382  if options.reference_likelihood_file:
383  likelihood_namedtuples_list = [namedtuple('likelihood_namedtuple',('likelihood_file','reference_likelihood_file'))(None, options.reference_likelihood_file)]
384  else:
385  likelihood_namedtuples_list = [namedtuple('likelihood_namedtuple',('likelihood_file','reference_likelihood_file'))(likelihood_file,None) for likelihood_file in options.likelihood_file]
386 
387  if not (len(options.svd_bank) == len(options.output) == len(likelihood_namedtuples_list)):
388  raise ValueError("must have equal numbers of svd banks, output files and likelihood files")
389 
390  #
391  # Option checks complete
392  #
393 
394  # do this before converting option types
395  process_params = options.__dict__.copy()
396 
397  options.nxydump_segment, = segmentsUtils.from_range_strings([options.nxydump_segment], boundtype = LIGOTimeGPS)
398 
399  if options.blind_injections is not None:
400  detectors.injection_filename = options.blind_injections
401 
402  # Setup local caching
403  if options.local_frame_caching:
404  f, fname = tempfile.mkstemp(".cache")
405  if options.verbose:
406  print >> sys.stderr, "caching frame data locally to ", fname
407  f = open(fname, "w")
408  # FIXME: should try to down-sample if possible. there are
409  # MDCs data sets floating around whose streams do not start
410  # on integer second boundaries, however, and it's possible
411  # to trigger a failure in the frame muxer if those get
412  # down-sampled so for now we're not doing any resampling.
413  # later, when we don't care about those MDCs, we can go
414  # back to down-sampling. if not being able to down-sample
415  # is a problem in the meantime, I think the best way
416  # forward is to clip the start of said streams using a drop
417  # element and a (to be written) buffer probe that figures
418  # out how many samples to clip off the start of the stream.
419  # FIXME shouldn't use tempfile.gettempdir() directly, use
420  # _CONDOR_SCRATCH_DIR like glue??
421  # FIXME, note that at least for now condor sets TMPDIR to the
422  # run scratch space so this *will* work properly
423  detectors.local_cache_list = hoftcache.cache_hoft(detectors, output_path = tempfile.gettempdir(), verbose = options.verbose)
424  for cacheentry in detectors.local_cache_list:
425  # Guarantee a lal cache complient file with only integer starts and durations.
426  cacheentry.segment = segments.segment( int(cacheentry.segment[0]), int(math.ceil(cacheentry.segment[1])) )
427  print >>f, str(cacheentry)
428  detectors.frame_cache = fname
429 
430  # the injections are now present in the data so we don't want to do them twice
431  detectors.injection_filename = None
432 
433  # Choose to optionally reconstruct segments around injections (not blind injections!)
434  if options.injections:
435  reconstruction_segment_list = simulation.sim_inspiral_to_segment_list(options.injections)
436  else:
437  reconstruction_segment_list = None
438 
439  # we're done
440  return options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list, likelihood_namedtuples_list
441 
442 
443 #
444 # =============================================================================
445 #
446 # Signal Handler
447 #
448 # =============================================================================
449 #
450 
451 
452 class OneTimeSignalHandler(object):
453  def __init__(self, pipeline):
454  self.pipeline = pipeline
455  self.count = 0
456 
457  def __call__(self, signum, frame):
458  self.count += 1
459  if self.count == 1:
460  print >>sys.stderr, "*** SIG %d attempting graceful shutdown (this might take several minutes) ... ***" % signum
461  try:
462  #FIXME how do I choose a timestamp?
463  self.pipeline.get_bus().post(inspiral.message_new_checkpoint(self.pipeline, timestamp=now().ns()))
464  if not self.pipeline.send_event(gst.event_new_eos()):
465  raise Exception("pipeline.send_event(EOS) returned failure")
466  except Exception as e:
467  print >>sys.stderr, "graceful shutdown failed: %s\naborting." % str(e)
468  os._exit(1)
469  else:
470  print >>sys.stderr, "*** received SIG %d %d times... ***" % (signum, self.count)
471 
472 
473 #
474 # =============================================================================
475 #
476 # Main
477 #
478 # =============================================================================
479 #
480 
481 
482 #
483 # parse command line
484 #
485 
486 
487 options, filenames, process_params, svd_banks, detectors, reconstruction_segment_list, likelihood_namedtuples_list = parse_command_line()
488 
489 if not options.check_time_stamps:
490  pipeparts.mkchecktimestamps = lambda pipeline, src, *args: src
491 
492 
493 #
494 # Parse the vetos segments file(s) if provided
495 #
496 
497 
498 if options.veto_segments_file is not None:
499  veto_segments = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.veto_segments_file, verbose = options.verbose, contenthandler = LIGOLWContentHandler), options.veto_segments_name).coalesce()
500 else:
501  veto_segments = None
502 
503 
504 #
505 # set up the PSDs
506 #
507 # There are three modes for psds in this program
508 # 1) --reference-psd without --track-psd - a fixed psd (provided by the user) will be used to whiten the data
509 # 2) --track-psd without --reference-psd - a psd will me measured and used on the fly
510 # 3) --track-psd with --reference-psd - a psd will be measured on the fly, but the first guess will come from the users provided psd
511 #
512 
513 
514 if options.reference_psd is not None:
515  psd = lalseries.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lalseries.LIGOLWContentHandler))
516 else:
517  psd = dict((instrument, None) for instrument in detectors.channel_dict)
518 
519 
520 #
521 # Process svd banks in serial
522 #
523 
524 
525 for svd_bank, output_filename, likelihood_namedtuple in zip(svd_banks, options.output, likelihood_namedtuples_list):
526  #
527  # create a new, empty, Bottle application and make it the current
528  # default, then start http server(s) to serve it up
529  #
530 
531 
532  bottle.default_app.push()
533  # uncomment the next line to show tracebacks when something fails
534  # in the web server
535  #bottle.app().catchall = False
536  httpservers = httpinterface.HTTPServers(0, bottle_app = bottle.default_app(), service_name = "gstlal_inspiral" + (" (%s)" % options.job_tag if options.job_tag is not None else ""), service_properties = {"job_tag": options.job_tag if options.job_tag is not None else ""}, verbose = options.verbose)
537 
538 
539  #
540  # Set up a registry of the resources that this job provides
541  #
542 
543 
544  @bottle.route("/")
545  @bottle.route("/index.html")
546  def index(job_tag = options.job_tag, instruments = set(svd_bank.keys())):
547  host = socket.gethostname()
548  server_address = "http://%s:%d" % (host, httpservers[0][0].port)
549  yield "<html><body>\n<h3>%s %s %s</h3>\n<p>\n" % (job_tag, host, " ".join(sorted(instruments)))
550  for route in sorted(bottle.default_app().routes, key = lambda route: route.rule):
551  if route.rule in ("/", "/index.html"):
552  # don't create links back to this page
553  continue
554  if route.method != "GET":
555  # only create links for GET methods
556  continue
557  yield "<a href=\"%s%s\">%s</a><br>\n" % (server_address, route.rule, route.rule)
558  yield "</p>\n</body></html>"
559  # FIXME: get service-discovery working, then don't do this
560  if "GSTLAL_LL_JOB" in os.environ:
561  open("%s_registry.txt" % os.environ["GSTLAL_LL_JOB"], "w").write("http://%s:%s/\n" % (socket.gethostname(), httpservers[0][0].port))
562 
563 
564  #
565  # parse SVD template bank and expose bank metadata
566  #
567 
568 
569  banks = inspiral.parse_bank_files(svd_bank, verbose = options.verbose)
570 
571  @bottle.route("/bank.txt")
572  def get_filter_length_and_chirpmass(banks = banks):
573  bank = banks.values()[0][0] #FIXME maybe shouldn't just take the first ones
574  yield '%.14g %.4g %.4g' % (float(now()), bank.filter_length, bank.sngl_inspiral_table[0].mchirp)
575 
576 
577  #
578  # Build pipeline
579  #
580 
581 
582  if options.verbose:
583  print >>sys.stderr, "assembling pipeline ...",
584 
585  pipeline = gst.Pipeline("gstlal_inspiral")
586  mainloop = gobject.MainLoop()
587 
588  triggersrc = lloidparts.mkLLOIDmulti(
589  pipeline,
590  detectors = detectors,
591  banks = banks,
592  psd = psd,
593  psd_fft_length = options.psd_fft_length,
594  ht_gate_threshold = options.ht_gate_threshold,
595  veto_segments = veto_segments,
596  verbose = options.verbose,
597  nxydump_segment = options.nxydump_segment,
598  chisq_type = options.chisq_type,
599  track_psd = options.track_psd,
600  control_peak_time = options.control_peak_time,
601  fir_stride = options.fir_stride,
602  reconstruction_segment_list = reconstruction_segment_list
603  )
604 
605 
606  if options.verbose:
607  print >>sys.stderr, "done"
608 
609 
610  #
611  # Load likelihood ratio data, assume injections are present!
612  #
613 
614 
615  if options.data_source in ("lvshm", "framexmit"):
616  coinc_params_distributions, ranking_data, seglists = far.parse_likelihood_control_doc(ligolw_utils.load_filename(likelihood_namedtuple[0] if likelihood_namedtuple[0] is not None else likelihood_namedtuple[1], verbose = options.verbose, contenthandler = far.ThincaCoincParamsDistributions.LIGOLWContentHandler))
617  assert set(seglists) == set(detectors.channel_dict)
618  if coinc_params_distributions is None:
619  raise ValueError("\"%s\" does not contain parameter distribution data" % likelihood_namedtuple[0] if likelihood_namedtuple[0] is not None else likelihood_namedtuple[1])
620  else:
621  coinc_params_distributions, ranking_data, seglists = far.ThincaCoincParamsDistributions(), None, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in detectors.channel_dict)
622 
623 
624  #
625  # build output document
626  #
627 
628 
629  if options.verbose:
630  print >>sys.stderr, "initializing output document ..."
631  output = inspiral.Data(
632  filename = output_filename or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.ifos_from_instrument_set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
633  process_params = process_params,
634  pipeline = pipeline,
635  instruments = set(detectors.channel_dict),
636  seg = detectors.seg or segments.segment(LIGOTimeGPS(0), LIGOTimeGPS(2000000000)), # online data doesn't have a segment so make it all possible time
637  coincidence_threshold = options.coincidence_threshold,
638  coinc_params_distributions = coinc_params_distributions,
639  ranking_data = ranking_data,
640  marginalized_likelihood_file = options.marginalized_likelihood_file,
641  likelihood_files_namedtuple = likelihood_namedtuple,
642  injection_filename = options.injections,
643  time_slide_file = options.time_slide_file,
644  comment = options.comment,
645  tmp_path = options.tmp_space,
646  likelihood_snapshot_interval = options.likelihood_snapshot_interval, # seconds
647  thinca_interval = options.thinca_interval,
648  sngls_snr_threshold = options.singles_threshold,
649  gracedb_far_threshold = options.gracedb_far_threshold,
650  gracedb_group = options.gracedb_group,
651  gracedb_search = options.gracedb_search,
652  gracedb_pipeline = options.gracedb_pipeline,
653  verbose = options.verbose
654  )
655  if options.verbose:
656  print >>sys.stderr, "... output document initialized"
657 
658  handler = lloidparts.Handler(mainloop, pipeline, output, instruments = set(detectors.channel_dict), tag = options.job_tag, seglistdict = seglists, verbose = options.verbose)
659 
660 
661  if options.verbose:
662  print >>sys.stderr, "attaching appsinks to pipeline ...",
663  appsync = pipeparts.AppSync(appsink_new_buffer = output.appsink_new_buffer)
664  appsinks = set(appsync.add_sink(pipeline, pipeparts.mkqueue(pipeline, src), caps = gst.Caps("application/x-lal-snglinspiral"), name = "%s_sink_%d" % (instrument, n)) for instrument, srcs in triggersrc.items() for n, src in enumerate(srcs))
665  if options.verbose:
666  print >>sys.stderr, "attached %d, done" % len(appsinks)
667 
668 
669  #
670  # if we request a dot graph of the pipeline, set it up
671  #
672 
673 
674  if options.write_pipeline is not None:
675  pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
676  pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)
677 
678 
679  #
680  # Run pipeline
681  #
682 
683 
684  if options.data_source in ("lvshm", "framexmit"):
685  #
686  # setup sigint handler to shutdown pipeline. this is how
687  # the program stops gracefully, it is the only way to stop
688  # it. Otherwise it runs forever man.
689  #
690  # this is only done in the online case because we want
691  # offline jobs to just die and not write partial databases
692  #
693 
694  signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
695  signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))
696 
697 
698  if options.verbose:
699  print >>sys.stderr, "setting pipeline state to playing ..."
700  if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
701  raise RuntimeError("pipeline did not enter playing state")
702 
703  if options.write_pipeline is not None:
704  pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)
705 
706  if options.verbose:
707  print >>sys.stderr, "running pipeline ..."
708  mainloop.run()
709 
710 
711  #
712  # write output file
713  #
714 
715 
716  output.write_output_file(filename = output_filename or output.coincs_document.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose)
717 
718 
719  #
720  # Cleanup template bank temp files
721  #
722 
723 
724  for ifo in banks:
725  for bank in banks[ifo]:
726  if options.verbose:
727  print >> sys.stderr, "removing file: ", bank.template_bank_filename
728  os.remove(bank.template_bank_filename)
729 
730 
731  #
732  # Shutdown the web interface servers and garbage collect the Bottle
733  # app. This should release the references the Bottle app's routes
734  # hold to the pipeline's data (like template banks and so on).
735  #
736 
737 
738  del httpservers
739  bottle.default_app.pop()
740 
741 
742  #
743  # Set pipeline state to NULL and garbage collect the handler
744  #
745 
746 
747  if pipeline.set_state(gst.STATE_NULL) != gst.STATE_CHANGE_SUCCESS:
748  raise RuntimeError("pipeline could not be set to NULL")
749  del handler.pipeline
750  del output.pipeline
751  del handler
752  del bank
753  del banks
754 
755 
756 #
757 # Cleanup local caches
758 #
759 
760 
761 if options.local_frame_caching:
762  if options.verbose:
763  print >>sys.stderr, "deleting temporary cache file ", detectors.frame_cache
764  os.remove(detectors.frame_cache)
765  del detectors.local_cache_list
766 
767 
768 #
769 # done. online pipeline always ends with an error code so that dagman does
770 # not mark the job "done" and the job will be restarted when the dag is
771 # restarted.
772 #
773 
774 
775 if options.data_source in ("lvshm", "framexmit"):
776  sys.exit(1)