gstlal-inspiral  0.4.2
 All Classes Namespaces Files Functions Variables Pages
inspiral.py
Go to the documentation of this file.
1 # Copyright (C) 2009-2013 Kipp Cannon, Chad Hanna, Drew Keppel
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 #
17 
18 ## @file
19 # The python module to implement things needed by gstlal_inspiral
20 #
21 # ### Review Status
22 #
23 # STATUS: reviewed with actions
24 #
25 # | Names | Hash | Date |
26 # | ------------------------------------------- | ------------------------------------------- | ---------- |
27 # | Kipp Cannon, Chad Hanna, Jolien Creighton, Florent Robinet, B. Sathyaprakash, Duncan Meacher, T.G.G. Li | b8fef70a6bafa52e3e120a495ad0db22007caa20 | 2014-12-03 |
28 #
29 # #### Action items
30 # - Document examples of how to get SNR history, etc., to a web browser in an offline search
31 # - Long term goal: Using template duration (rather than chirp mass) should load balance the pipeline and improve statistics
32 # - L651: One thing to sort out is the signal probability while computing coincs
33 # - L640-L647: Get rid of obsolete comments
34 # - L667: Make sure timeslide events are not sent to GRACEDB
35 # - Lxxx: Can normalisation of the tail of the distribution pre-computed using fake data?
36 # - L681: fmin should not be hard-coded to 10 Hz. horizon_distance will be horribly wrong if psd is constructed, e.g. using some high-pass filter. For example, change the default to 40 Hz.
37 # - L817: If gracedb upload failed then it should be possible to identify the failure, the specifics of the trigger that encountered failure and a way of submitting the trigger again to gracedb is important. Think about how to clean-up failures.
38 # - Mimick gracedb upload failures and see if the code crashes
39 
40 
41 ## @package inspiral
42 
43 #
44 # =============================================================================
45 #
46 # Preamble
47 #
48 # =============================================================================
49 #
50 
51 
52 from collections import deque
53 import copy
54 import math
55 import numpy
56 import os
57 import resource
58 from scipy import random
59 try:
60  import sqlite3
61 except ImportError:
62  # pre 2.5.x
63  from pysqlite2 import dbapi2 as sqlite3
64 import StringIO
65 import subprocess
66 import sys
67 import threading
68 import time
69 import httplib
70 import tempfile
71 import shutil
72 
73 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
74 import pygtk
75 pygtk.require("2.0")
76 import gobject
77 gobject.threads_init()
78 import pygst
79 pygst.require('0.10')
80 import gst
81 
82 try:
83  from ligo import gracedb
84 except ImportError:
85  print >>sys.stderr, "warning: gracedb import failed, program will crash if gracedb uploads are attempted"
86 
87 from glue import iterutils
88 from glue import segments
89 from glue.ligolw import ligolw
90 from glue.ligolw import dbtables
91 from glue.ligolw import ilwd
92 from glue.ligolw import lsctables
93 from glue.ligolw import array as ligolw_array
94 from glue.ligolw import param as ligolw_param
95 from glue.ligolw import utils as ligolw_utils
96 from glue.ligolw.utils import ligolw_sqlite
97 from glue.ligolw.utils import ligolw_add
98 from glue.ligolw.utils import process as ligolw_process
99 from glue.ligolw.utils import search_summary as ligolw_search_summary
100 from glue.ligolw.utils import segments as ligolw_segments
101 from glue.ligolw.utils import time_slide as ligolw_time_slide
102 import lal
103 from pylal import rate
104 from pylal.datatypes import LALUnit
105 from pylal.datatypes import LIGOTimeGPS
106 from pylal.datatypes import REAL8FrequencySeries
107 from pylal.xlal.datatypes.snglinspiraltable import from_buffer as sngl_inspirals_from_buffer
108 
109 from gstlal import bottle
110 from gstlal import reference_psd
111 from gstlal import streamthinca
112 from gstlal import svd_bank
113 from gstlal import cbc_template_iir
114 from gstlal import far
115 
116 lsctables.LIGOTimeGPS = LIGOTimeGPS
117 
118 
119 #
120 # =============================================================================
121 #
122 # Misc
123 #
124 # =============================================================================
125 #
126 
127 
128 def message_new_checkpoint(src, timestamp = None):
129  s = gst.Structure("CHECKPOINT")
130  s.set_value("timestamp", timestamp)
131  return gst.message_new_application(src, s)
132 
133 
134 def channel_dict_from_channel_list(channel_list, channel_dict = {"H1" : "LSC-STRAIN", "H2" : "LSC-STRAIN", "L1" : "LSC-STRAIN", "V1" : "LSC-STRAIN"}):
135  """
136  given a list of channels like this ["H1=LSC-STRAIN",
137  H2="SOMETHING-ELSE"] produce a dictionary keyed by ifo of channel
138  names. The default values are LSC-STRAIN for all detectors
139  """
140 
141  for channel in channel_list:
142  ifo = channel.split("=")[0]
143  chan = "".join(channel.split("=")[1:])
144  channel_dict[ifo] = chan
145 
146  return channel_dict
147 
148 
149 def pipeline_channel_list_from_channel_dict(channel_dict, opt = "channel-name"):
150  """
151  produce a string of channel name arguments suitable for a pipeline.py
152  program that doesn't technically allow multiple options. For example
153  --channel-name=H1=LSC-STRAIN --channel-name=H2=LSC-STRAIN
154  """
155 
156  outstr = ""
157  for i, ifo in enumerate(channel_dict):
158  if i == 0:
159  outstr += "%s=%s " % (ifo, channel_dict[ifo])
160  else:
161  outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[ifo])
162 
163  return outstr
164 
165 
166 def state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict = {"H1" : [0x7, 0x160], "L1" : [0x7, 0x160], "V1" : [0x67, 0x100]}):
167  """
168  """
169 
170  for line in on_bit_list:
171  ifo = line.split("=")[0]
172  bits = "".join(line.split("=")[1:])
173  try:
174  state_vector_on_off_dict[ifo][0] = int(bits)
175  except ValueError: # must be hex
176  state_vector_on_off_dict[ifo][0] = int(bits, 16)
177 
178  for line in off_bit_list:
179  ifo = line.split("=")[0]
180  bits = "".join(line.split("=")[1:])
181  try:
182  state_vector_on_off_dict[ifo][1] = int(bits)
183  except ValueError: # must be hex
184  state_vector_on_off_dict[ifo][1] = int(bits, 16)
185 
186  return state_vector_on_off_dict
187 
188 
189 def state_vector_on_off_list_from_bits_dict(bit_dict):
190  """
191  """
192 
193  onstr = ""
194  offstr = ""
195  for i, ifo in enumerate(bit_dict):
196  if i == 0:
197  onstr += "%s=%s " % (ifo, bit_dict[ifo][0])
198  offstr += "%s=%s " % (ifo, bit_dict[ifo][1])
199  else:
200  onstr += "--state-vector-on-bits=%s=%s " % (ifo, bit_dict[ifo][0])
201  offstr += "--state-vector-off-bits=%s=%s " % (ifo, bit_dict[ifo][1])
202 
203  return onstr, offstr
204 
205 
206 def parse_svdbank_string(bank_string):
207  """
208  parses strings of form
209 
210  H1:bank1.xml,H2:bank2.xml,L1:bank3.xml
211 
212  into a dictionary of lists of bank files.
213  """
214  out = {}
215  if bank_string is None:
216  return out
217  for b in bank_string.split(','):
218  ifo, bank = b.split(':')
219  if ifo in out:
220  raise ValueError("Only one svd bank per instrument should be given")
221  out[ifo] = bank
222  return out
223 
224 
225 def parse_iirbank_string(bank_string):
226  """
227  parses strings of form
228 
229  H1:bank1.xml,H2:bank2.xml,L1:bank3.xml,H2:bank4.xml,...
230 
231  into a dictionary of lists of bank files.
232  """
233  out = {}
234  if bank_string is None:
235  return out
236  for b in bank_string.split(','):
237  ifo, bank = b.split(':')
238  out.setdefault(ifo, []).append(bank)
239  return out
240 
241 
242 def parse_bank_files(svd_banks, verbose, snr_threshold = None):
243  """
244  given a dictionary of lists of svd template bank file names parse them
245  into a dictionary of bank classes
246  """
247 
248  banks = {}
249 
250  for instrument, filename in svd_banks.items():
251  for n, bank in enumerate(svd_bank.read_banks(filename, contenthandler = LIGOLWContentHandler, verbose = verbose)):
252  # Write out sngl inspiral table to temp file for trigger generator
253  # FIXME teach the trigger generator to get this information a better way
254  bank.template_bank_filename = tempfile.NamedTemporaryFile(suffix = ".gz", delete = False).name
255  xmldoc = ligolw.Document()
256  # FIXME if this table reference is from a DB this is a problem (but it almost certainly isn't)
257  xmldoc.appendChild(ligolw.LIGO_LW()).appendChild(bank.sngl_inspiral_table.copy()).extend(bank.sngl_inspiral_table)
258  ligolw_utils.write_filename(xmldoc, bank.template_bank_filename, gz = True, verbose = verbose)
259  xmldoc.unlink() # help garbage collector
260  bank.logname = "%sbank%d" % (instrument, n)
261  banks.setdefault(instrument, []).append(bank)
262  if snr_threshold is not None:
263  bank.snr_threshold = snr_threshold
264 
265  # FIXME remove when this is no longer an issue
266  if not banks:
267  raise ValueError("Could not parse bank files into valid bank dictionary.\n\t- Perhaps you are using out-of-date svd bank files? Please ensure that they were generated with the same code version as the parsing code")
268  return banks
269 
270 def parse_iirbank_files(iir_banks, verbose, snr_threshold = 4.0):
271  """
272  given a dictionary of lists of iir template bank file names parse them
273  into a dictionary of bank classes
274  """
275 
276  banks = {}
277 
278  for instrument, files in iir_banks.items():
279  for n, filename in enumerate(files):
280  # FIXME over ride the file name stored in the bank file with
281  # this file name this bank I/O code needs to be fixed
282  bank = cbc_template_iir.load_iirbank(filename, snr_threshold, contenthandler = LIGOLWContentHandler, verbose = verbose)
283  bank.template_bank_filename = filename
284  bank.logname = "%sbank%d" % (instrument,n)
285  banks.setdefault(instrument,[]).append(bank)
286 
287  return banks
288 
289 
290 def subdir_from_T050017_filename(fname):
291  path = str(fname.split("-")[2])[:5]
292  try:
293  os.mkdir(path)
294  except OSError:
295  pass
296  return path
297 
298 
299 #
300 # =============================================================================
301 #
302 # glue.ligolw Content Handlers
303 #
304 # =============================================================================
305 #
306 
307 
308 class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
309  pass
310 ligolw_array.use_in(LIGOLWContentHandler)
311 ligolw_param.use_in(LIGOLWContentHandler)
312 lsctables.use_in(LIGOLWContentHandler)
313 
314 
315 #
316 # =============================================================================
317 #
318 # Parameter Distributions
319 #
320 # =============================================================================
321 #
322 
323 
324 #
325 # Functions to synthesize injections
326 #
327 
328 
329 def snr_distribution(size, startsnr):
330  """
331  This produces a power law distribution in snr of size size starting at startsnr
332  """
333  return startsnr * random.power(3, size)**-1 # 3 here actually means 2 :) according to scipy docs
334 
335 
336 def noncentrality(snrs, prefactor):
337  """
338  This produces a set of noncentrality parameters that scale with snr^2 according to the prefactor
339  """
340  return prefactor * random.rand(len(snrs)) * snrs**2 # FIXME power depends on dimensionality of the bank and the expectation for the mismatch for real signals
341  #return prefactor * random.power(1, len(snrs)) * snrs**2 # FIXME power depends on dimensionality of the bank and the expectation for the mismatch for real signals
342 
343 
344 def chisq_distribution(df, non_centralities, size):
345  """
346  This produces a set of noncentral chisq values of size size, with degrees of freedom given by df
347  """
348  out = numpy.empty((len(non_centralities) * size,))
349  for i, nc in enumerate(non_centralities):
350  out[i*size:(i+1)*size] = random.noncentral_chisquare(df, nc, size)
351  return out
352 
353 
354 #
355 # =============================================================================
356 #
357 # Output Document
358 #
359 # =============================================================================
360 #
361 
362 
363 class CoincsDocument(object):
364  sngl_inspiral_columns = ("process_id", "ifo", "end_time", "end_time_ns", "eff_distance", "coa_phase", "mass1", "mass2", "snr", "chisq", "chisq_dof", "bank_chisq", "bank_chisq_dof", "sigmasq", "spin1x", "spin1y", "spin1z", "spin2x", "spin2y", "spin2z", "event_id")
365 
366  def __init__(self, filename, process_params, comment, instruments, seg, injection_filename = None, time_slide_file = None, tmp_path = None, replace_file = None, verbose = False):
367  #
368  # how to make another like us
369  #
370 
371  self.get_another = lambda: CoincsDocument(filename = filename, process_params = process_params, comment = comment, instruments = instruments, seg = seg, injection_filename = injection_filename, time_slide_file = time_slide_file, tmp_path = tmp_path, replace_file = replace_file, verbose = verbose)
372 
373  #
374  # filename
375  #
376 
377  self.filename = filename
378 
379  #
380  # build the XML document
381  #
382 
383  self.xmldoc = ligolw.Document()
384  self.xmldoc.appendChild(ligolw.LIGO_LW())
385  self.process = ligolw_process.register_to_xmldoc(self.xmldoc, u"gstlal_inspiral", process_params, comment = comment, ifos = instruments)
386  self.search_summary = ligolw_search_summary.append_search_summary(self.xmldoc, self.process,
387  lalwrapper_cvs_tag = None, # FIXME
388  lal_cvs_tag = None, # FIXME
389  inseg = seg
390  )
391  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.SnglInspiralTable, columns = self.sngl_inspiral_columns))
392  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincDefTable))
393  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincTable))
394  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincMapTable))
395  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.TimeSlideTable))
396  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.CoincInspiralTable))
397  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.SegmentDefTable, columns = ligolw_segments.LigolwSegmentList.segment_def_columns))
398  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.SegmentSumTable, columns = ligolw_segments.LigolwSegmentList.segment_sum_columns))
399  self.xmldoc.childNodes[-1].appendChild(lsctables.New(lsctables.SegmentTable, columns = ligolw_segments.LigolwSegmentList.segment_columns))
400 
401  #
402  # optionally insert injection list document
403  #
404 
405  if injection_filename is not None:
406  ligolw_add.ligolw_add(self.xmldoc, [injection_filename], contenthandler = LIGOLWContentHandler, verbose = verbose)
407 
408  #
409  # optionally insert a time slide table document. if we
410  # don't have one, add an all-zero offset vector. remove
411  # duplicate offset vectors when done
412  #
413 
414  time_slide_table = lsctables.TimeSlideTable.get_table(self.xmldoc)
415  if time_slide_file is not None:
416  ligolw_add.ligolw_add(self.xmldoc, [time_slide_file], contenthandler = LIGOLWContentHandler, verbose = verbose)
417  else:
418  time_slide_table.append_offsetvector(dict.fromkeys(instruments, 0.0), self.process)
419  time_slide_mapping = ligolw_time_slide.time_slides_vacuum(time_slide_table.as_dict())
420  iterutils.inplace_filter(lambda row: row.time_slide_id not in time_slide_mapping, time_slide_table)
421  for tbl in self.xmldoc.getElementsByTagName(ligolw.Table.tagName):
422  tbl.applyKeyMapping(time_slide_mapping)
423 
424  #
425  # if the output is an sqlite database, build the sqlite
426  # database and convert the in-ram XML document to an
427  # interface to the database file
428  #
429 
430  if filename is not None and filename.endswith('.sqlite'):
431  self.working_filename = dbtables.get_connection_filename(filename, tmp_path = tmp_path, replace_file = replace_file, verbose = verbose)
432  self.connection = sqlite3.connect(self.working_filename, check_same_thread = False)
433  ligolw_sqlite.insert_from_xmldoc(self.connection, self.xmldoc, preserve_ids = False, verbose = verbose)
434 
435  #
436  # convert self.xmldoc into wrapper interface to
437  # database
438  #
439 
440  self.xmldoc.removeChild(self.xmldoc.childNodes[-1]).unlink()
441  self.xmldoc.appendChild(dbtables.get_xml(self.connection))
442 
443  # recover the process_id following the ID remapping
444  # that might have happened when the document was
445  # inserted. hopefully this query is unique enough
446  # to find exactly the one correct entry in the
447  # database
448 
449  (self.process.process_id,), = (self.search_summary.process_id,), = self.connection.cursor().execute("SELECT process_id FROM process WHERE program == ? AND node == ? AND username == ? AND unix_procid == ? AND start_time == ?", (self.process.program, self.process.node, self.process.username, self.process.unix_procid, self.process.start_time)).fetchall()
450  self.process.process_id = self.search_summary.process_id = ilwd.ilwdchar(self.process.process_id)
451  else:
452  self.connection = self.working_filename = None
453 
454  #
455  # retrieve references to the table objects, now that we
456  # know if they are database-backed or XML objects
457  #
458 
459  self.sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(self.xmldoc)
460  self.llwsegments = ligolw_segments.LigolwSegments(self.xmldoc, self.process)
461 
462 
463  def commit(self):
464  # update output document
465  if self.connection is not None:
466  self.connection.commit()
467 
468 
469  @property
470  def process_id(self):
471  return self.process.process_id
472 
473 
474  @property
475  def search_summary_outseg(self):
476  return self.search_summary.out_segment
477 
478 
479  def add_to_search_summary_outseg(self, seg):
480  out_segs = segments.segmentlist([self.search_summary_outseg])
481  if out_segs == [None]:
482  # out segment not yet initialized
483  del out_segs[:]
484  out_segs |= segments.segmentlist([seg])
485  self.search_summary.out_segment = out_segs.extent()
486 
487 
488  def get_next_sngl_id(self):
489  return self.sngl_inspiral_table.get_next_id()
490 
491 
492  def T050017_filename(self, description, extension):
493  start, end = self.search_summary_outseg
494  start, end = int(math.floor(start)), int(math.ceil(end))
495  return "%s-%s-%d-%d.%s" % ("".join(sorted(self.process.instruments)), description, start, end - start, extension)
496 
497 
498  def write_output_file(self, verbose = False):
499  self.llwsegments.finalize()
500  ligolw_process.set_process_end_time(self.process)
501 
502  if self.connection is not None:
503  seg = self.search_summary_outseg
504  # record the final state of the search_summary and
505  # process rows in the database
506  cursor = self.connection.cursor()
507  if seg is not None:
508  cursor.execute("UPDATE search_summary SET out_start_time = ?, out_start_time_ns = ?, out_end_time = ?, out_end_time_ns = ? WHERE process_id == ?", (seg[0].seconds, seg[0].nanoseconds, seg[1].seconds, seg[1].nanoseconds, self.search_summary.process_id))
509  cursor.execute("UPDATE search_summary SET nevents = (SELECT count(*) FROM sngl_inspiral) WHERE process_id == ?", (self.search_summary.process_id,))
510  cursor.execute("UPDATE process SET end_time = ? WHERE process_id == ?", (self.process.end_time, self.process.process_id))
511  cursor.close()
512  self.connection.commit()
513  dbtables.build_indexes(self.connection, verbose = verbose)
514  self.connection.close()
515  self.connection = None
516  dbtables.put_connection_filename(self.filename, self.working_filename, verbose = verbose)
517  else:
518  self.sngl_inspiral_table.sort(lambda a, b: cmp(a.end_time, b.end_time) or cmp(a.end_time_ns, b.end_time_ns) or cmp(a.ifo, b.ifo))
519  self.search_summary.nevents = len(self.sngl_inspiral_table)
520  ligolw_utils.write_filename(self.xmldoc, self.filename, gz = (self.filename or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None)
521 
522 
523 class Data(object):
524  def __init__(self, filename, process_params, pipeline, instruments, seg, coincidence_threshold, coinc_params_distributions, ranking_data, marginalized_likelihood_file = None, likelihood_files_namedtuple = None, injection_filename = None, time_slide_file = None, comment = None, tmp_path = None, likelihood_snapshot_interval = None, thinca_interval = 50.0, sngls_snr_threshold = None, gracedb_far_threshold = None, gracedb_group = "Test", gracedb_search = "LowMass", gracedb_pipeline = "gstlal", replace_file = True, verbose = False):
525  #
526  # initialize
527  #
528 
529  self.lock = threading.Lock()
530  self.pipeline = pipeline
531  self.verbose = verbose
532  # None to disable likelihood ratio assignment, otherwise a filename
533  self.marginalized_likelihood_file = marginalized_likelihood_file
534  self.likelihood_files_namedtuple = likelihood_files_namedtuple
535  # None to disable periodic snapshots, otherwise seconds
536  # set to 1.0 to disable background data decay
537  self.likelihood_snapshot_interval = likelihood_snapshot_interval
539  # gracedb far threshold
540  self.gracedb_far_threshold = gracedb_far_threshold
541  self.gracedb_group = gracedb_group
542  self.gracedb_search = gracedb_search
543  self.gracedb_pipeline = gracedb_pipeline
544 
545  #
546  # setup bottle routes
547  #
548 
549  bottle.route("/latency_histogram.txt")(self.web_get_latency_histogram)
550  bottle.route("/latency_history.txt")(self.web_get_latency_history)
551  bottle.route("/snr_history.txt")(self.web_get_snr_history)
552  bottle.route("/ram_history.txt")(self.web_get_ram_history)
553  bottle.route("/likelihood.xml")(self.web_get_likelihood_file)
554  bottle.route("/gracedb_far_threshold.txt", method = "GET")(self.web_get_gracedb_far_threshold)
555  bottle.route("/gracedb_far_threshold.txt", method = "POST")(self.web_set_gracedb_far_threshold)
556  bottle.route("/sngls_snr_threshold.txt", method = "GET")(self.web_get_sngls_snr_threshold)
557  bottle.route("/sngls_snr_threshold.txt", method = "POST")(self.web_set_sngls_snr_threshold)
558 
559  #
560  # initialize document to hold coincs and segments
561  #
562 
563  self.coincs_document = CoincsDocument(filename, process_params, comment, instruments, seg, injection_filename = injection_filename, time_slide_file = time_slide_file, tmp_path = tmp_path, replace_file = replace_file, verbose = verbose)
564 
565  #
566  # attach a StreamThinca instance to ourselves
567  #
568 
570  coincidence_threshold = coincidence_threshold,
571  thinca_interval = thinca_interval, # seconds
572  sngls_snr_threshold = sngls_snr_threshold
573  )
574 
575  #
576  # setup likelihood ratio book-keeping. seglistsdicts to be
577  # populated the pipeline handler
578  #
579 
580  self.coinc_params_distributions = coinc_params_distributions
581  self.ranking_data = ranking_data
582  self.seglistdicts = None
583  self.fapfar = None
584 
585  #
586  # Fun output stuff
587  #
588 
589  self.latency_histogram = rate.BinnedArray(rate.NDBins((rate.LinearPlusOverflowBins(5, 205, 22),)))
590  self.latency_history = deque(maxlen = 1000)
591  self.snr_history = deque(maxlen = 1000)
592  self.ram_history = deque(maxlen = 1000)
593 
594  def appsink_new_buffer(self, elem):
595  with self.lock:
596  # retrieve triggers from appsink element
597  buf = elem.emit("pull-buffer")
598  events = sngl_inspirals_from_buffer(buf)
599  # FIXME: ugly way to get the instrument
600  instrument = elem.get_name().split("_")[0]
601 
602  # update search_summary out segment and our
603  # livetime
604  buf_timestamp = LIGOTimeGPS(0, buf.timestamp)
605  buf_seg = segments.segment(buf_timestamp, buf_timestamp + LIGOTimeGPS(0, buf.duration))
606  self.coincs_document.add_to_search_summary_outseg(buf_seg)
607  self.seglistdicts["triggersegments"][instrument] |= segments.segmentlist((buf_seg,))
608 
609  # set metadata on triggers. because this uses the
610  # ID generator attached to the database-backed
611  # sngl_inspiral table, and that generator has been
612  # synced to the database' contents, the IDs
613  # assigned here will not collide with any already
614  # in the database
615  for event in events:
616  event.process_id = self.coincs_document.process_id
617  event.event_id = self.coincs_document.get_next_sngl_id()
618 
619  # update likelihood snapshot if needed
620  if self.likelihood_snapshot_interval is not None and (self.likelihood_snapshot_timestamp is None or buf_timestamp - self.likelihood_snapshot_timestamp >= self.likelihood_snapshot_interval):
621  self.likelihood_snapshot_timestamp = buf_timestamp
622 
623  # smooth the distributions. re-populates
624  # PDF arrays from raw counts
625  self.coinc_params_distributions.finish(verbose = self.verbose)
626 
627  # post a checkpoint message. FIXME: make
628  # sure this triggers
629  # self.snapshot_output_file() to be
630  # invoked. lloidparts takes care of that
631  # for now, but spreading the program logic
632  # around like that isn't a good idea, this
633  # code should be responsible for it
634  # somehow, no?
635  self.pipeline.get_bus().post(message_new_checkpoint(self.pipeline, timestamp = buf_timestamp.ns()))
636 
637  # If a reference likelihood file is given, load
638  # data coinc_params_distributions data before
639  # smoothing distributions
640  # FIXME There is currently no guarantee that
641  # the reference_likelihood_file on disk will
642  # have updated since the last snapshot, but for
643  # our purpose it should not have that large of
644  # an effect. The data loaded should never be
645  # older than the snapshot before last
646  if self.likelihood_files_namedtuple.reference_likelihood_file is not None:
647  self.coinc_params_distributions = far.parse_likelihood_control_doc(ligolw_utils.load_filename(self.likelihood_files_namedtuple.reference_likelihood_file, verbose = self.verbose, contenthandler = far.ThincaCoincParamsDistributions.LIGOLWContentHandler))[0]
648  self.coinc_params_distributions.finish(verbose = self.verbose)
649 
650  if self.marginalized_likelihood_file is not None:
651  # FIXME: must set horizon
652  # distances in coinc params object
653 
654  # enable streamthinca's likelihood
655  # ratio assignment using our own,
656  # local, parameter distribution
657  # data
658  self.stream_thinca.coinc_params_distributions = self.coinc_params_distributions
659 
660  # read the marginalized likelihood
661  # ratio distributions that have
662  # been updated asynchronously and
663  # initialize a FAP/FAR assignment
664  # machine from it.
665  ranking_data, seglists = far.parse_likelihood_control_doc(ligolw_utils.load_filename(self.marginalized_likelihood_file, verbose = self.verbose, contenthandler = far.ThincaCoincParamsDistributions.LIGOLWContentHandler))[1:]
666  if ranking_data is None:
667  raise ValueError("\"%s\" does not contain ranking statistic PDFs" % self.marginalized_likelihood_file)
668  ranking_data.finish(verbose = self.verbose)
669  self.fapfar = far.FAPFAR(ranking_data, livetime = far.get_live_time(seglists))
670 
671  # run stream thinca. update the parameter
672  # distribution data from sngls that weren't used in
673  # coincs
674  for event in self.stream_thinca.add_events(self.coincs_document.xmldoc, self.coincs_document.process_id, events, buf_timestamp, fapfar = self.fapfar):
675  self.coinc_params_distributions.add_background(self.coinc_params_distributions.coinc_params((event,), None))
676  self.coincs_document.commit()
677 
678  # update zero-lag coinc bin counts in
679  # coinc_params_distributions. NOTE: if likelihood
680  # ratios are known then these are the counts of
681  # occurances of parameters in coincs above
682  # threshold, otherwise they are the counts of
683  # occurances of parameters in all coincs. knowing
684  # the meaning of the counts that get recorded is
685  # left as an exercise to the user
686  if self.stream_thinca.last_coincs:
687  for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
688  offset_vector = self.stream_thinca.last_coincs.offset_vector(coinc_event.time_slide_id)
689  if (coinc_event.likelihood >= far.RankingData.ln_likelihood_ratio_threshold or self.marginalized_likelihood_file is None) and not any(offset_vector.values()):
690  self.coinc_params_distributions.add_zero_lag(self.coinc_params_distributions.coinc_params(self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id), offset_vector))
691 
692  # Cluster last coincs before recording number of zero
693  # lag events or sending alerts to gracedb
694  # FIXME Do proper clustering that saves states between
695  # thinca intervals and uses an independent clustering
696  # window. This can also go wrong if there are multiple
697  # events with an identical likelihood. It will just
698  # choose the event with the highest event id
699  if self.stream_thinca.last_coincs:
700  self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.iteritems(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)])
701 
702  # Add events to the observed likelihood histogram post "clustering"
703  # FIXME proper clustering is really needed (see above)
704  if self.stream_thinca.last_coincs:
705  for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
706  offset_vector = self.stream_thinca.last_coincs.offset_vector(coinc_event.time_slide_id)
707  #IFOS come from coinc_inspiral not coinc_event
708  ifos = self.stream_thinca.last_coincs.coinc_inspiral_index[coinc_event_id].ifos
709  if (coinc_event.likelihood is not None and coinc_event.likelihood >= far.RankingData.ln_likelihood_ratio_threshold) and not any(offset_vector.values()):
710  self.ranking_data.zero_lag_likelihood_rates[frozenset(lsctables.instrument_set_from_ifos(ifos))][coinc_event.likelihood,] += 1
711 
712  # do GraceDB alerts
713  if self.gracedb_far_threshold is not None:
714  self.__do_gracedb_alerts()
715  self.__update_eye_candy()
716 
717  def record_horizon_distance(self, instrument, timestamp, psd, m1, m2, snr_threshold = 8.0):
718  with self.lock:
719  # FIXME get frequency bounds from templates
720  horizon_distance = reference_psd.horizon_distance(psd, m1 = m1, m2 = m2, snr = snr_threshold, f_min = 40.0, f_max = 0.85 * (psd.f0 + (len(psd.data) - 1) * psd.deltaF))
721  assert not (math.isnan(horizon_distance) or math.isinf(horizon_distance))
722  # NOTE: timestamp is cast to float. should be
723  # safe, whitener should be reporting PSDs with
724  # integer timestamps. anyway, we don't need
725  # nanosecond precision for the horizon distance
726  # history.
727  try:
728  horizon_history = self.coinc_params_distributions.horizon_history[instrument]
729  except KeyError:
730  horizon_history = self.coinc_params_distributions.horizon_history[instrument] = far.NearestLeafTree()
731  horizon_history[float(timestamp)] = horizon_distance
732 
733  def __get_likelihood_file(self):
734  # generate a coinc parameter distribution document. NOTE:
735  # likelihood ratio PDFs *are* included if they were present in
736  # the --likelihood-file that was loaded.
737  xmldoc = ligolw.Document()
738  xmldoc.appendChild(ligolw.LIGO_LW())
739  process = ligolw_process.register_to_xmldoc(xmldoc, u"gstlal_inspiral", paramdict = {})
740  search_summary = ligolw_search_summary.append_search_summary(xmldoc, process, ifos = self.seglistdicts["triggersegments"].keys(), inseg = self.seglistdicts["triggersegments"].extent_all(), outseg = self.seglistdicts["triggersegments"].extent_all())
741  # FIXME: now that we've got all kinds of segment lists
742  # being collected, decide which of them should go here.
743  far.gen_likelihood_control_doc(xmldoc, process, self.coinc_params_distributions, self.ranking_data, self.seglistdicts["triggersegments"])
744  ligolw_process.set_process_end_time(process)
745  return xmldoc
746 
747  def web_get_likelihood_file(self):
748  with self.lock:
749  output = StringIO.StringIO()
750  ligolw_utils.write_fileobj(self.__get_likelihood_file(), output, trap_signals = None)
751  outstr = output.getvalue()
752  output.close()
753  return outstr
754 
755  def __flush(self):
756  # run StreamThinca's .flush(). returns the last remaining
757  # non-coincident sngls. add them to the distribution
758  for event in self.stream_thinca.flush(self.coincs_document.xmldoc, self.coincs_document.process_id, fapfar = self.fapfar):
759  self.coinc_params_distributions.add_background(self.coinc_params_distributions.coinc_params((event,), None))
760  self.coincs_document.commit()
761 
762  # update zero-lag bin counts in coinc_params_distributions
763  if self.stream_thinca.last_coincs:
764  for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
765  offset_vector = self.stream_thinca.last_coincs.offset_vector(coinc_event.time_slide_id)
766  if (coinc_event.likelihood >= far.RankingData.ln_likelihood_ratio_threshold or self.marginalized_likelihood_file is None) and not any(offset_vector.values()):
767  self.coinc_params_distributions.add_zero_lag(self.coinc_params_distributions.coinc_params(self.stream_thinca.last_coincs.sngl_inspirals(coinc_event_id), offset_vector))
768 
769  # Cluster last coincs before recording number of zero
770  # lag events or sending alerts to gracedb
771  # FIXME Do proper clustering that saves states between
772  # thinca intervals and uses an independent clustering
773  # window. This can also go wrong if there are multiple
774  # events with an identical likelihood. It will just
775  # choose the event with the highest event id
776  if self.stream_thinca.last_coincs:
777  self.stream_thinca.last_coincs.coinc_event_index = dict([max(self.stream_thinca.last_coincs.coinc_event_index.iteritems(), key = lambda (coinc_event_id, coinc_event): coinc_event.likelihood)])
778 
779  # Add events to the observed likelihood histogram post "clustering"
780  # FIXME proper clustering is really needed (see above)
781  if self.stream_thinca.last_coincs:
782  for coinc_event_id, coinc_event in self.stream_thinca.last_coincs.coinc_event_index.items():
783  offset_vector = self.stream_thinca.last_coincs.offset_vector(coinc_event.time_slide_id)
784  #IFOS come from coinc_inspiral not coinc_event
785  ifos = self.stream_thinca.last_coincs.coinc_inspiral_index[coinc_event_id].ifos
786  if (coinc_event.likelihood is not None and coinc_event.likelihood >= far.RankingData.ln_likelihood_ratio_threshold) and not any(offset_vector.values()):
787  self.ranking_data.zero_lag_likelihood_rates[frozenset(lsctables.instrument_set_from_ifos(ifos))][coinc_event.likelihood,] += 1
788 
789  # do GraceDB alerts
790  if self.gracedb_far_threshold is not None:
791  self.__do_gracedb_alerts()
792 
793  def flush(self):
794  with self.lock:
795  self.__flush()
796 
797  def __do_gracedb_alerts(self):
798  if self.stream_thinca.last_coincs:
799  gracedb_client = gracedb.Client()
800  gracedb_ids = []
801  psdmessage = None
802  coinc_inspiral_index = self.stream_thinca.last_coincs.coinc_inspiral_index
803 
804  # This appears to be a silly for loop since
805  # coinc_event_index will only have one value, but we're
806  # future proofing this at the point where it could have
807  # multiple clustered events
808  for coinc_event in self.stream_thinca.last_coincs.coinc_event_index.values():
809  #
810  # continue if the false alarm rate is not low
811  # enough, or is nan.
812  #
813 
814  if coinc_inspiral_index[coinc_event.coinc_event_id].combined_far is None or coinc_inspiral_index[coinc_event.coinc_event_id].combined_far > self.gracedb_far_threshold or numpy.isnan(coinc_inspiral_index[coinc_event.coinc_event_id].combined_far):
815  continue
816 
817  #
818  # retrieve PSDs
819  #
820 
821  if psdmessage is None:
822  if self.verbose:
823  print >>sys.stderr, "retrieving PSDs from whiteners and generating psd.xml.gz ..."
824  psddict = {}
825  for instrument in self.seglistdicts["triggersegments"]:
826  elem = self.pipeline.get_by_name("lal_whiten_%s" % instrument)
827  # FIXME: remove
828  # LIGOTimeGPS type cast
829  # when we port to swig
830  # version of
831  # REAL8FrequencySeries
832  psddict[instrument] = REAL8FrequencySeries(
833  name = "PSD",
834  epoch = LIGOTimeGPS(lal.UTCToGPS(time.gmtime()), 0),
835  f0 = 0.0,
836  deltaF = elem.get_property("delta-f"),
837  sampleUnits = LALUnit("s strain^2"), # FIXME: don't hard-code this
838  data = numpy.array(elem.get_property("mean-psd"))
839  )
840  psdmessage = StringIO.StringIO()
841  reference_psd.write_psd_fileobj(psdmessage, psddict, gz = True, trap_signals = None)
842 
843  #
844  # fake a filename for end-user convenience
845  #
846 
847  observatories = "".join(sorted(set(instrument[0] for instrument in self.seglistdicts["triggersegments"])))
848  instruments = "".join(sorted(self.seglistdicts["triggersegments"]))
849  description = "%s_%s_%s_%s" % (instruments, ("%.4g" % coinc_inspiral_index[coinc_event.coinc_event_id].mass).replace(".", "_").replace("-", "_"), self.gracedb_group, self.gracedb_search)
850  end_time = int(coinc_inspiral_index[coinc_event.coinc_event_id].get_end())
851  filename = "%s-%s-%d-%d.xml" % (observatories, description, end_time, 0)
852 
853  #
854  # construct message and send to gracedb.
855  # we go through the intermediate step of
856  # first writing the document into a string
857  # buffer incase there is some safety in
858  # doing so in the event of a malformed
859  # document; instead of writing directly
860  # into gracedb's input pipe and crashing
861  # part way through.
862  #
863 
864  if self.verbose:
865  print >>sys.stderr, "sending %s to gracedb ..." % filename
866  message = StringIO.StringIO()
867  xmldoc = self.stream_thinca.last_coincs[coinc_event.coinc_event_id]
868  # give the alert all the standard inspiral
869  # columns (attributes should all be
870  # populated). FIXME: ugly.
871  sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc)
872  for standard_column in ("process_id", "ifo", "search", "channel", "end_time", "end_time_ns", "end_time_gmst", "impulse_time", "impulse_time_ns", "template_duration", "event_duration", "amplitude", "eff_distance", "coa_phase", "mass1", "mass2", "mchirp", "mtotal", "eta", "kappa", "chi", "tau0", "tau2", "tau3", "tau4", "tau5", "ttotal", "psi0", "psi3", "alpha", "alpha1", "alpha2", "alpha3", "alpha4", "alpha5", "alpha6", "beta", "f_final", "snr", "chisq", "chisq_dof", "bank_chisq", "bank_chisq_dof", "cont_chisq", "cont_chisq_dof", "sigmasq", "rsqveto_duration", "Gamma0", "Gamma1", "Gamma2", "Gamma3", "Gamma4", "Gamma5", "Gamma6", "Gamma7", "Gamma8", "Gamma9", "spin1x", "spin1y", "spin1z", "spin2x", "spin2y", "spin2z", "event_id"):
873  try:
874  sngl_inspiral_table.appendColumn(standard_column)
875  except ValueError:
876  # already has it
877  pass
878  ligolw_utils.write_fileobj(xmldoc, message, gz = False, trap_signals = None)
879  xmldoc.unlink()
880  # FIXME: make this optional from command line?
881  if True:
882  resp = gracedb_client.createEvent(self.gracedb_group, self.gracedb_pipeline, filename, filecontents = message.getvalue(), search = self.gracedb_search)
883  resp_json = resp.json()
884  if resp.status != httplib.CREATED:
885  print >>sys.stderr, "gracedb upload of %s failed" % filename
886  else:
887  if self.verbose:
888  print >>sys.stderr, "event assigned grace ID %s" % resp_json["graceid"]
889  gracedb_ids.append(resp_json["graceid"])
890  else:
891  proc = subprocess.Popen(("/bin/cp", "/dev/stdin", filename), stdin = subprocess.PIPE)
892  proc.stdin.write(message.getvalue())
893  proc.stdin.flush()
894  proc.stdin.close()
895  message.close()
896 
897  #
898  # do PSD file uploads
899  #
900 
901  if psdmessage is not None:
902  filename = "psd.xml.gz"
903  for gracedb_id in gracedb_ids:
904  resp = gracedb_client.writeLog(gracedb_id, "strain spectral densities", filename = filename, filecontents = psdmessage.getvalue(), tagname = "psd")
905  resp_json = resp.json()
906  if resp.status != httplib.CREATED:
907  print >>sys.stderr, "gracedb upload of %s for ID %s failed" % (filename, gracedb_id)
908 
909  def do_gracedb_alerts(self):
910  with self.lock:
911  self.__do_gracedb_alerts()
912 
913  def __update_eye_candy(self):
914  if self.stream_thinca.last_coincs:
915  self.ram_history.append((time.time(), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB
916  latency_val = None
917  snr_val = (0,0)
918  coinc_inspiral_index = self.stream_thinca.last_coincs.coinc_inspiral_index
919  for coinc_event_id, coinc_inspiral in coinc_inspiral_index.items():
920  # FIXME: update when a proper column is available
921  latency = coinc_inspiral.minimum_duration
922  self.latency_histogram[latency,] += 1
923  if latency_val is None:
924  t = float(coinc_inspiral_index[coinc_event_id].get_end())
925  latency_val = (t, latency)
926  snr = coinc_inspiral_index[coinc_event_id].snr
927  if snr >= snr_val[1]:
928  t = float(coinc_inspiral_index[coinc_event_id].get_end())
929  snr_val = (t, snr)
930  if latency_val is not None:
931  self.latency_history.append(latency_val)
932  if snr_val != (0,0):
933  self.snr_history.append(snr_val)
934 
935  def update_eye_candy(self):
936  with self.lock:
937  self.__update_eye_candy()
938 
939  def web_get_latency_histogram(self):
940  with self.lock:
941  for latency, number in zip(self.latency_histogram.centres()[0][1:-1], self.latency_histogram.array[1:-1]):
942  yield "%e %e\n" % (latency, number)
943 
944  def web_get_latency_history(self):
945  with self.lock:
946  # first one in the list is sacrificed for a time stamp
947  for time, latency in self.latency_history:
948  yield "%f %e\n" % (time, latency)
949 
950  def web_get_snr_history(self):
951  with self.lock:
952  # first one in the list is sacrificed for a time stamp
953  for time, snr in self.snr_history:
954  yield "%f %e\n" % (time, snr)
955 
956  def web_get_ram_history(self):
957  with self.lock:
958  # first one in the list is sacrificed for a time stamp
959  for time, ram in self.ram_history:
960  yield "%f %e\n" % (time, ram)
961 
962  def web_get_gracedb_far_threshold(self):
963  with self.lock:
964  if self.gracedb_far_threshold is not None:
965  yield "rate=%.17g\n" % self.gracedb_far_threshold
966  else:
967  yield "rate=\n"
968 
969  def web_set_gracedb_far_threshold(self):
970  try:
971  with self.lock:
972  rate = bottle.request.forms["rate"]
973  if rate:
974  self.gracedb_far_threshold = float(rate)
975  yield "OK: rate=%.17g\n" % self.gracedb_far_threshold
976  else:
977  self.gracedb_far_threshold = None
978  yield "OK: rate=\n"
979  except:
980  yield "error\n"
981 
982  def web_get_sngls_snr_threshold(self):
983  with self.lock:
984  if self.stream_thinca.sngls_snr_threshold is not None:
985  yield "snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold
986  else:
987  yield "snr=\n"
988 
989  def web_set_sngls_snr_threshold(self):
990  try:
991  with self.lock:
992  snr_threshold = bottle.request.forms["snr"]
993  if snr_threshold:
994  self.stream_thinca.sngls_snr_threshold = float(rate)
995  yield "OK: snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold
996  else:
997  self.stream_thinca.sngls_snr_threshold = None
998  yield "OK: snr=\n"
999  except:
1000  yield "error\n"
1001 
1002  def __write_output_file(self, filename = None, verbose = False):
1003  self.__flush()
1004 
1005  # FIXME: should this be done in .flush() somehow?
1006  for segtype, seglistdict in self.seglistdicts.items():
1007  self.coincs_document.llwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID")
1008 
1009  if filename is not None:
1010  self.coincs_document.filename = filename
1011  self.coincs_document.write_output_file(verbose = verbose)
1012 
1013  def __write_likelihood_file(self, filename, description, snapshot = False, verbose = False):
1014  # write the parameter PDF file. NOTE; this file contains
1015  # raw bin counts, and might or might not contain smoothed,
1016  # normalized, PDF arrays but if it does they will not
1017  # necessarily correspond to the bin counts.
1018  #
1019  # the parameter PDF arrays cannot be re-computed here
1020  # because it would interfer with their use by stream
1021  # thinca. we want to know precisely when the arrays get
1022  # updated so we can have a hope of computing the likelihood
1023  # ratio PDFs correctly.
1024 
1025  path, filename = os.path.split(filename)
1026  tmp_likelihood_file = os.path.join(path, 'tmp_%s' % filename)
1027  ligolw_utils.write_filename(self.__get_likelihood_file(), tmp_likelihood_file, gz = (filename or "stdout").endswith(".gz"), verbose = verbose, trap_signals = None)
1028  shutil.move(tmp_likelihood_file, os.path.join(path,filename))
1029  # Snapshots get their own custom file and path
1030  if snapshot:
1031  fname = self.coincs_document.T050017_filename(description + '_DISTSTATS', 'xml.gz', verbose = verbose)
1032  path = subdir_from_T050017_filename(fname)
1033  shutil.copy(os.path.join(path,filename), os.path.join(path, fname))
1034 
1035  def write_output_file(self, filename = None, description = "", verbose = False):
1036  with self.lock:
1037  self.__write_output_file(filename = filename, verbose = verbose)
1038  if self.likelihood_files_namedtuple.likelihood_file:
1039  self.__write_likelihood_file(self.likelihood_files_namedtuple.likelihood_file, description, verbose = verbose)
1040 
1041  # can't be used anymore
1042  del self.coincs_document
1043 
1044  def snapshot_output_file(self, description, extension, verbose = False):
1045  with self.lock:
1046  coincs_document = self.coincs_document.get_another()
1047  # We require the likelihood file to have the same name
1048  # as the input to this program to accumulate statistics
1049  # as we go
1050  fname = self.coincs_document.T050017_filename(description, extension)
1051  fname = os.path.join(subdir_from_T050017_filename(fname), fname)
1052  self.__write_output_file(filename = fname, verbose = verbose)
1053  if self.likelihood_files_namedtuple.likelihood_file:
1054  self.__write_likelihood_file(self.likelihood_files_namedtuple.likelihood_file, description, snapshot = True, verbose = verbose)
1055 
1056  # can't be used anymore
1057  del self.coincs_document
1058  self.coincs_document = coincs_document