gstlal-inspiral  0.4.2
 All Classes Namespaces Files Functions Variables Pages
lloidparts.py
Go to the documentation of this file.
1 # Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 ##
18 # @file
19 #
20 # A file that contains the lloidparts module code; Roughly speaking it
21 # implements the algorithm described in <a
22 # href=http://arxiv.org/abs/1107.2665>ApJ 748 136 (2012)</a>
23 #
24 #
25 # Review Status
26 #
27 # | Names | Hash | Date |
28 # | ------------------------------------------- | ------------------------------------------- | ---------- |
29 # | Sathya, Duncan Me, Jolien, Kipp, Chad | 4bb3204b4ea43e4508dc279ddcfc417366c72841 | 2014-05-02 |
30 #
31 # #### Actions
32 # - Feature request: do checkpointing when instruments are down
33 # - Feature request: need to hook up equivalent of "CAT 2" vetoes to online analysis when ready, and HW inj veto, etc.
34 # - Document the parameters in mkLLOIDmulti()
35 # - Check if bank ids are the same across different instruments
36 # - Feature request: Make time-frequency videos for interesting events
37 # - Inject signals of known, high SNR and measure loss
38 # - move the makesegmentsrcgate to before the matrix mixer, not in the current sum-of-squares control loop
39 # - Make conditional branches in the graph gray boxes
40 # - consider zero padding the beginning of jobs to get rid of mkdrop()
41 # - Decide how to properly normalize SNRs for incomplete filter segments (currently filters are not renormalized so the loss in SNR is bigger than might be expected)
42 # - Check and possibly fix the function that switches between time-domain and FFT convolution based on stride and number of samples
43 # - Consider if quality = 9 is right for downsampling (it should be plenty good, but maybe excessive?)
44 #
45 #
46 # #### Functions/classes not reviewed since they will be moved
47 # - DetectorData
48 # - mkSPIIRmulti
49 # - mkSPIIRhoftToSnrSlices
50 # - mkLLOIDSnrSlicesToTimeSliceChisq
51 # - mkLLOIDSnrChisqToTriggers
52 
53 ##
54 # @package lloidparts
55 #
56 # a module for building gstreamer graphs of the LLOID algorithm
57 #
58 
59 
60 #
61 # =============================================================================
62 #
63 # Preamble
64 #
65 # =============================================================================
66 #
67 
68 
69 import math
70 import sys
71 import os
72 import numpy
73 import warnings
74 import StringIO
75 
76 
77 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
78 import pygtk
79 pygtk.require("2.0")
80 import gobject
81 gobject.threads_init()
82 import pygst
83 pygst.require('0.10')
84 import gst
85 
86 
87 from glue import iterutils
88 from glue import segments
89 from glue.ligolw import ligolw
90 from glue.ligolw import utils as ligolw_utils
91 from glue.ligolw.utils import segments as ligolw_segments
92 from glue.ligolw.utils import process as ligolw_process
93 from gstlal import bottle
94 from gstlal import datasource
95 from gstlal import multirate_datasource
96 from gstlal import pipeio
97 from gstlal import pipeparts
98 from gstlal import simplehandler
99 from pylal import series as lalseries
100 from pylal.datatypes import LIGOTimeGPS
101 
102 
103 #
104 # =============================================================================
105 #
106 # Pipeline Elements
107 #
108 # =============================================================================
109 #
110 
111 ##
112 # A "sum-of-squares" aggregator
113 #
114 # _Gstreamer graph describing this function:_
115 #
116 # @dot
117 # digraph G {
118 # rankdir="LR";
119 #
120 # // nodes
121 # node [shape=box, style=rounded];
122 #
123 # lal_adder
124 # lal_peak [URL="\ref pipeparts.mkpeak()", style=filled, color=grey, label="lal_peak\niff control_peak_samples > 0"];
125 # capsfilter [URL = "\ref pipeparts.mkcapsfilter()"];
126 # "mksegmentsrcgate()" [URL="\ref datasource.mksegmentsrcgate()"];
127 # tee [URL="\ref pipeparts.mktee()"];
128 # lal_checktimestamps [URL="\ref pipeparts.mkchecktimestamps()"];
129 #
130 # // connections
131 # "? sink 1" -> lal_adder;
132 # "? sink 2" -> lal_adder;
133 # "? sink N" -> lal_adder;
134 # lal_adder -> capsfilter;
135 # capsfilter -> lal_peak;
136 # lal_peak -> "mksegmentsrcgate()";
137 # "mksegmentsrcgate()" -> lal_checktimestamps;
138 # lal_checktimestamps -> tee;
139 # tee -> "? src";
140 # }
141 # @enddot
142 #
143 #
144 def mkcontrolsnksrc(pipeline, rate, verbose = False, suffix = None, reconstruction_segment_list = None, seekevent = None, control_peak_samples = None):
145  """!
146  This function implements a portion of a gstreamer graph to provide a
147  control signal for deciding when to reconstruct physical SNRS
148 
149  @param pipeline A reference to the gstreamer pipeline in which to add this graph
150  @param rate An integer representing the target sample rate of the resulting src
151  @param verbose Make verbose
152  @param suffix Log name for verbosity
153  @param reconstruction_segment_list A segment list object that describes when the control signal should be on. This can be useful in e.g., only reconstructing physical SNRS around the time of injections, which can save an enormous amount of CPU time.
154  @param seekevent A seek event such as what would be created by seek_event_for_gps()
155  @param control_peak_samples If nonzero, this would do peakfinding on the control signal with the window specified by this parameter. The peak finding would give a single sample of "on" state at the peak. This will cause far less CPU to be used if you only want to reconstruct SNR around the peak of the control signal.
156  """
157  #
158  # start with an adder and caps filter to select a sample rate
159  #
160 
161  snk = pipeparts.mkadder(pipeline, None)
162  src = pipeparts.mkcapsfilter(pipeline, snk, "audio/x-raw-float, rate=%d" % rate)
163 
164  #
165  # Add a peak finder on the control signal sample number
166  #
167 
168  if control_peak_samples > 0:
169  src = pipeparts.mkpeak(pipeline, src, control_peak_samples)
170 
171  #
172  # optionally add a segment src and gate to only reconstruct around
173  # injections
174  #
175  # FIXME: set the names of these gates so their segments can be
176  # collected later? or else propagate this segment list into the
177  # output some other way.
178 
179  if reconstruction_segment_list is not None:
180  src = datasource.mksegmentsrcgate(pipeline, src, reconstruction_segment_list, seekevent = seekevent, invert_output = False)
181 
182  #
183  # verbosity and a tee
184  #
185 
186  logname = suffix and "_%s" % suffix or ""
187  if verbose:
188  src = pipeparts.mkprogressreport(pipeline, src, "progress_sumsquares%s" % logname)
189  src = pipeparts.mkchecktimestamps(pipeline, src, "timestamps%s_sumsquares" % logname)
190  src = pipeparts.mktee(pipeline, src)
191 
192  #
193  # return the adder and tee
194  #
195 
196  return snk, src
197 
198 
199 class Handler(simplehandler.Handler):
200  """!
201  A subclass of simplehandler.Handler to be used with e.g.,
202  gstlal_inspiral
203 
204  Implements additional message handling for dealing with spectrum
205  messages and checkpoints for the online analysis including periodic
206  dumps of segment information, trigger files and background
207  distribution statistics.
208  """
209  def __init__(self, mainloop, pipeline, dataclass, instruments, tag = "", seglistdict = None, verbose = False):
210  """!
211  @param mainloop The main application's event loop
212  @param pipeline The gstreamer pipeline that is being controlled by this handler
213  @param dataclass An inspiral.Data class instance
214  @param tag The tag to use for naming file snapshots, e.g. the description will be "%s_LLOID" % tag
215  @param verbose Be verbose
216  """
217  super(Handler, self).__init__(mainloop, pipeline)
218 
219  self.dataclass = dataclass
220 
221  self.tag = tag
222  self.verbose = verbose
223 
224  # setup segment list collection from gates
225  #
226  # FIXME: knowledge of what gates are present in what
227  # configurations, what they are called, etc., somehow needs to live
228  # with the code that constructs the gates
229  #
230  # FIXME: in the offline pipeline, state vector segments
231  # don't get recorded. however, except for the h(t) gate
232  # segments these are all inputs to the pipeline so it
233  # probably doesn't matter. nevertheless, they maybe should
234  # go into the event database for completeness of that
235  # record, or maybe not because it could result in a lot of
236  # duplication of on-disk data. who knows. think about it.
237  gate_suffix = {
238  # FIXME: datasegments temporarily disabled until
239  # we decide how to deal with livetime in dag (which
240  # is currently adding its own datasegments lists to
241  # the final database)
242  #"datasegments": "frame_segments_gate",
243  "statevectorsegments": "state_vector_gate",
244  "whitehtsegments": "ht_gate"
245  }
246 
247  # dictionary mapping segtype to segmentlist dictionary
248  # mapping instrument to segment list
249  self.seglistdicts = dict((segtype, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in instruments)) for segtype in gate_suffix)
250  # add a "triggersegments" entry
251  if seglistdict is None:
252  self.seglistdicts["triggersegments"] = segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in instruments)
253  else:
254  self.seglistdicts["triggersegments"] = seglistdict
255  # hook the Data class's livetime record keeping into ours
256  # so that all segments come here
257  # FIXME: don't do this, get rid of the Data class
258  dataclass.seglistdicts = self.seglistdicts
259 
260  # state of segments being collected
261  self.current_segment_start = {}
262 
263  # iterate over segment types and instruments, look for the
264  # gate element that should provide those segments, and
265  # connect handlers to collect the segments
266  if verbose:
267  print >>sys.stderr, "connecting segment handlers to gates ..."
268  for segtype, seglistdict in self.seglistdicts.items():
269  for instrument in seglistdict:
270  try:
271  name = "%s_%s" % (instrument, gate_suffix[segtype])
272  except KeyError:
273  # this segtype doesn't come from
274  # gate elements
275  continue
276  elem = self.pipeline.get_by_name(name)
277  if elem is None:
278  # ignore missing gate elements
279  if verbose:
280  print >>sys.stderr, "\tcould not find %s for %s '%s'" % (name, instrument, segtype)
281  continue
282  if verbose:
283  print >>sys.stderr, "\tfound %s for %s '%s'" % (name, instrument, segtype)
284  elem.connect("start", self.gatehandler, (segtype, instrument, "on"))
285  elem.connect("stop", self.gatehandler, (segtype, instrument, "off"))
286  elem.set_property("emit-signals", True)
287  if verbose:
288  print >>sys.stderr, "... done connecting segment handlers to gates"
289 
290  # most recent spectrum reported by each whitener
291  self.psds = {}
292  bottle.route("/psds.xml")(self.web_get_psd_xml)
293 
294  # segment lists
295  bottle.route("/segments.xml")(self.web_get_segments_xml)
296 
297  def do_on_message(self, bus, message):
298  """!
299  Override the on_message method of simplehandler to handle
300  additional message types, e.g., spectrum and checkpointing messages.
301 
302  @param bus A reference to the pipeline's bus
303  @param message A reference to the incoming message
304  """
305  if message.type == gst.MESSAGE_ELEMENT:
306  if message.structure.get_name() == "spectrum":
307  # get the instrument, psd, and timestamp.
308  # NOTE: epoch is used for the timestamp, this
309  # is the middle of the most recent FFT interval
310  # used to obtain this PSD
311  instrument = message.src.get_name().split("_")[-1]
312  psd = pipeio.parse_spectrum_message(message)
313  timestamp = psd.epoch
314 
315  # save
316  self.psds[instrument] = psd
317 
318  # update horizon distance history
319  #
320  # FIXME: probably need to compute these for a
321  # bunch of masses. which ones?
322  self.dataclass.record_horizon_distance(instrument, timestamp, psd, m1 = 1.4, m2 = 1.4)
323  return True
324  elif message.type == gst.MESSAGE_APPLICATION:
325  if message.structure.get_name() == "CHECKPOINT":
326  # FIXME make a custom parser for CHECKPOINT messages?
327  timestamp = message.structure["timestamp"]
328  self.checkpoint(timestamp)
329  return True
330  elif message.type == gst.MESSAGE_EOS:
331  with self.dataclass.lock:
332  # FIXME: how to choose correct timestamp?
333  try:
334  timestamp = self.seglistdicts["triggersegments"].extent_all()[1].ns()
335  except ValueError:
336  # no segments
337  return False
338  self.close_segments(timestamp)
339  return False
340  return False
341 
342  def _close_segments(self, timestamp):
343  # close out existing segments. the code in the loop
344  # modifies the iteration target, so iterate over a copy
345  for (segtype, instrument), start_time in list(self.current_segment_start.items()):
346  if timestamp < start_time.ns():
347  continue
348  # By construction these gates should be in the on
349  # state. We fake a state transition to off in
350  # order to flush the segments
351  self._gatehandler(None, timestamp, (segtype, instrument, "off"))
352  # But we have to remember to put it back
353  self._gatehandler(None, timestamp, (segtype, instrument, "on"))
354 
355  def close_segments(self, timestamp):
356  """!
357  Record stop times for all open segments and start new ones.
358 
359  @param timestamp the time in nanoseconds at which to mark
360  the boundary. If this preceeds and open segment's start
361  time, that segment is not closed.
362  """
363  with self.dataclass.lock:
364  self._close_segments(timestamp)
365 
366  def checkpoint(self, timestamp):
367  """!
368  Checkpoint, e.g., flush segments and triggers to disk.
369 
370  @param timestamp the gstreamer timestamp in nanoseconds of the current buffer in order to close off open segment intervals before writing to disk
371  """
372  self.flush_segments_to_disk(timestamp)
373  try:
374  self.dataclass.snapshot_output_file("%s_LLOID" % self.tag, "xml.gz", verbose = self.verbose)
375  except TypeError as te:
376  print >>sys.stderr, "Warning: couldn't build output file on checkpoint, probably there aren't any triggers: %s" % te
377 
378  def flush_segments_to_disk(self, timestamp):
379  """!
380  Flush segments to disk, e.g., when checkpointing or shutting
381  down an online pipeline.
382 
383  @param timestamp the gstreamer timestamp in nanoseconds of the current buffer in order to close off open segment intervals before writing to disk
384  """
385  with self.dataclass.lock:
386  try:
387  # close out existing segments.
388  self._close_segments(timestamp)
389  ext = segments.segmentlist(seglistdict.extent_all() for seglistdict in self.seglistdicts.values()).extent()
390  instruments = set(instrument for seglistdict in self.seglistdicts.values() for instrument in seglistdict)
391  #FIXME integrate with the Data class snapshotting directories
392  path = str(int(math.floor(ext[0])))[:5]
393  try:
394  os.mkdir(path)
395  except OSError:
396  pass
397  fname = "%s/%s-%s_SEGMENTS-%d-%d.xml.gz" % (path, "".join(sorted(instruments)), self.tag, int(math.floor(ext[0])), int(math.ceil(ext[1])) - int(math.floor(ext[0])))
398  ligolw_utils.write_filename(self.gen_segments_xmldoc(), fname, gz = fname.endswith('.gz'), verbose = self.verbose, trap_signals = None)
399 
400  # clear the segment lists in place
401  for segtype, seglistdict in self.seglistdicts.items():
402  # FIXME: we don't wipe the
403  # triggersegments for now. the
404  # online pipeline needs these to
405  # accumulate forever, but that
406  # might not be what it should be
407  # doing, nor should these
408  # necessarily be the segments it
409  # uses for livetime. figure this out
410  if segtype == "triggersegments":
411  continue
412  for seglist in seglistdict.values():
413  del seglist[:]
414  except ValueError:
415  print >>sys.stderr, "Warning: couldn't build segment list on checkpoint, probably there aren't any segments"
416 
417  def _gatehandler(self, elem, timestamp, (segtype, instrument, new_state)):
418  timestamp = LIGOTimeGPS(0, timestamp) # timestamp is in nanoseconds
419  state_key = (segtype, instrument)
420 
421  if self.verbose and elem is not None:
422  print >>sys.stderr, "%s: %s '%s' state transition: %s @ %s" % (elem.get_name(), instrument, segtype, new_state, str(timestamp))
423 
424  # if there is a current_segment_start for this then the
425  # state transition has to be off
426  if state_key in self.current_segment_start:
427  self.seglistdicts[segtype][instrument] |= segments.segmentlist((segments.segment(self.current_segment_start.pop(state_key), timestamp),))
428  if new_state == "on":
429  self.current_segment_start[state_key] = timestamp
430  else:
431  assert new_state == "off"
432 
433  def gatehandler(self, elem, timestamp, (segtype, instrument, new_state)):
434  """!
435  A handler that intercepts gate state transitions. This can set
436  the "on" segments for each detector
437 
438  @param elem A reference to the lal_gate element or None (only used for verbosity)
439  @param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds)
440  @param segtype the class of segments this gate is defining, e.g., "datasegments", etc..
441  @param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc..
442  @param new_state the state transition, must be either "on" or "off"
443  """
444  with self.dataclass.lock:
445  self._gatehandler(elem, timestamp, (segtype, instrument, new_state))
446 
448  """!
449  A method to output the segment list in a valid ligolw xml
450  format.
451  """
452  xmldoc = ligolw.Document()
453  xmldoc.appendChild(ligolw.LIGO_LW())
454  process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {})
455  with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments:
456  for segtype, seglistdict in self.seglistdicts.items():
457  ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot")
458  ligolw_process.set_process_end_time(process)
459  return xmldoc
460 
462  """!
463  provide a bottle route to get segment information via a url
464  """
465  with self.dataclass.lock:
466  output = StringIO.StringIO()
467  ligolw_utils.write_fileobj(self.gen_segments_xmldoc(), output, trap_signals = None)
468  outstr = output.getvalue()
469  output.close()
470  return outstr
471 
472  def gen_psd_xmldoc(self):
473  xmldoc = lalseries.make_psd_xmldoc(self.psds)
474  process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {})
475  ligolw_process.set_process_end_time(process)
476  return xmldoc
477 
478  def web_get_psd_xml(self):
479  with self.dataclass.lock:
480  output = StringIO.StringIO()
481  ligolw_utils.write_fileobj(self.gen_psd_xmldoc(), output, trap_signals = None)
482  outstr = output.getvalue()
483  output.close()
484  return outstr
485 
486 
487 ##
488 # _Gstreamer graph describing this function:_
489 #
490 # @dot
491 # digraph G {
492 # rankdir="LR";
493 #
494 # // nodes
495 # node [shape=box, style=rounded];
496 #
497 # lal_firbank [URL="\ref pipeparts.mkfirbank()"];
498 # lal_checktimestamps1 [URL="\ref pipeparts.mkchecktimestamps()"];
499 # lal_checktimestamps2 [URL="\ref pipeparts.mkchecktimestamps()"];
500 # queue [URL="\ref pipeparts.mkqueue()"];
501 # matrixmixer [URL="\ref pipeparts.mkmatrixmixer()", label="lal_matrixmixer\niff bank.mix_matrix", style=filled, color=grey];
502 #
503 # "? sink" -> lal_firbank;
504 # lal_firbank -> lal_checktimestamps1;
505 #
506 # // without control
507 #
508 # lal_checktimestamps1 -> queue [label="iff control_snk, control_src are None"];
509 # queue -> matrixmixer;
510 # matrixmixer -> lal_checktimestamps2;
511 # lal_checktimestamps2 -> "? src";
512 #
513 # // with control
514 #
515 # tee [URL="\ref pipeparts.mktee()"];
516 # queue2 [URL="\ref pipeparts.mkqueue()"];
517 # queue3 [URL="\ref pipeparts.mkqueue()"];
518 # queue4 [URL="\ref pipeparts.mkqueue()"];
519 # lal_checktimestamps3 [URL="\ref pipeparts.mkchecktimestamps()"];
520 # lal_checktimestamps4 [URL="\ref pipeparts.mkchecktimestamps()"];
521 # lal_checktimestamps5 [URL="\ref pipeparts.mkchecktimestamps()"];
522 # capsfilter [URL="\ref pipeparts.mkcapsfilter()"];
523 # lal_nofakedisconts [URL="\ref pipeparts.mknofakedisconts()"];
524 # gate [URL="\ref pipeparts.mkgate()"];
525 # "mkcontrolsnksrc()" [URL="\ref mkcontrolsnksrc()"];
526 # lal_sumsquares [URL="\ref pipeparts.mksumsquares()"];
527 # audioresample [URL="\ref pipeparts.mkresample()"];
528 #
529 #
530 # lal_checktimestamps1 -> tee [label="iff control_snk, control_src are not None"];
531 # tee -> lal_sumsquares -> queue2;
532 # queue2 -> lal_checktimestamps3;
533 # lal_checktimestamps3 -> audioresample;
534 # audioresample -> capsfilter;
535 # capsfilter -> lal_nofakedisconts;
536 # lal_nofakedisconts -> lal_checktimestamps4;
537 # lal_checktimestamps4 -> "mkcontrolsnksrc()"
538 # "mkcontrolsnksrc()" -> queue3;
539 # queue3 -> gate;
540 # tee -> queue4 -> gate;
541 # gate -> lal_checktimestamps5;
542 # lal_checktimestamps5 -> matrixmixer;
543 #
544 # }
545 # @enddot
546 #
547 #
548 def mkLLOIDbranch(pipeline, src, bank, bank_fragment, (control_snk, control_src), gate_attack_length, gate_hold_length, block_duration, nxydump_segment = None, fir_stride = None, control_peak_time = None):
549  """!
550  Make a single slice of one branch of the lloid graph, e.g. one instrument and one
551  template bank fragment. For details see: http://arxiv.org/abs/1107.2665
552 
553  Specifically this implements the filtering of multirate svd basis and
554  (conditional) resampling and reconstruction of the physical SNR
555 
556  @param pipeline The gstreamer pipeline in which to place this graph
557  @param src The source of data for this graph provided by a gstreamer element
558  @param bank The template bank class
559  @param bank_fragment The specific fragment (time slice) of the template bank in question
560  @param (control_snk, control_src) An optional tuple of the sink and source elements for a graph that will construct a control time series for the gate which aggregates the orthogonal snrs from each template slice. This is used to conditionally reconstruct the physical SNR of interesting times
561  @param gate_attack_length The attack length in samples for the lal_gate element that controls the reconstruction of physical SNRs
562  @param gate_hold_length The hold length in samples for the lal_gate element that controls the reconstruction of physical SNRs
563  @param block_duration The characteristic buffer size that is passed around, which is useful for constructing queues.
564  @param nxydump_segment Not used
565  @param fir_stride The target length of output buffers from lal_firbank. Directly effects latency. Making this short will force time-domain convolution. Otherwise FFT convolution will be done to save CPU cycles, but at higher latency.
566  @param control_peak_time The window over which to find peaks in the control signal. Shorter windows increase computational cost but probably also detection efficiency.
567  """
568  logname = "%s_%.2f.%.2f" % (bank.logname, bank_fragment.start, bank_fragment.end)
569 
570  #
571  # FIR filter bank. low frequency branches use time-domain
572  # convolution, high-frequency branches use FFT convolution with a
573  # block stride given by fir_stride.
574  #
575 
576  latency = -int(round(bank_fragment.start * bank_fragment.rate))
577  block_stride = fir_stride * bank_fragment.rate
578 
579  # we figure an fft costs ~5 logN flops where N is duration + block
580  # stride. Time domain costs N * block_stride. So if block stride is
581  # less than about 5logN you might as well do time domain filtering
582  # FIXME This calculation should probably be made more rigorous
583  time_domain = 5 * numpy.log2((bank_fragment.end - bank_fragment.start) * bank_fragment.rate + block_stride) > block_stride
584 
585  src = pipeparts.mkfirbank(pipeline, src, latency = latency, fir_matrix = bank_fragment.orthogonal_template_bank, block_stride = block_stride, time_domain = time_domain)
586  src = pipeparts.mkchecktimestamps(pipeline, src, "timestamps_%s_after_firbank" % logname)
587  # uncomment reblock if you ever use really big ffts and want to cut them down a bit
588  #src = pipeparts.mkreblock(pipeline, src, block_duration = control_peak_time * gst.SECOND)
589  #src = pipeparts.mkchecktimestamps(pipeline, src, "timestamps_%s_after_firbank_reblock" % logname)
590  #src = pipeparts.mktee(pipeline, src) # comment-out the tee below if this is uncommented
591  #pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, src), "orthosnr_%s.dump" % logname, segment = nxydump_segment)
592 
593  #pipeparts.mkvideosink(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkhistogramplot(pipeline, src), "video/x-raw-rgb, width=640, height=480, framerate=1/4"))
594  #pipeparts.mkogmvideosink(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkchannelgram(pipeline, pipeparts.mkqueue(pipeline, src), plot_width = .125), "video/x-raw-rgb, width=640, height=480, framerate=64/1"), "orthosnr_channelgram_%s.ogv" % logname, verbose = True)
595 
596  #
597  # compute weighted sum-of-squares, feed to sum-of-squares
598  # aggregator
599  #
600 
601  if control_snk is not None and control_src is not None:
602  src = pipeparts.mktee(pipeline, src) # comment-out if the tee above is uncommented
603  elem = pipeparts.mkqueue(pipeline, pipeparts.mksumsquares(pipeline, src, weights = bank_fragment.sum_of_squares_weights), max_size_buffers = 0, max_size_bytes = 0, max_size_time = block_duration)
604  elem = pipeparts.mkchecktimestamps(pipeline, elem, "timestamps_%s_after_sumsquare" % logname)
605  # FIXME: the capsfilter shouldn't be needed, the adder
606  # should intersect it's downstream peer's format with the
607  # sink format
608  elem = pipeparts.mkcapsfilter(pipeline, pipeparts.mkresample(pipeline, elem, quality = 9), "audio/x-raw-float, rate=%d" % max(bank.get_rates()))
609  elem = pipeparts.mknofakedisconts(pipeline, elem) # FIXME: remove when resampler is patched
610  elem = pipeparts.mkchecktimestamps(pipeline, elem, "timestamps_%s_after_sumsquare_resampler" % logname)
611  elem.link(control_snk)
612 
613  #
614  # use sum-of-squares aggregate as gate control for orthogonal SNRs
615  #
616  # FIXME This queue has to be large for the peak finder on the control
617  # signal if that element gets smarter maybe this could be made smaller
618  # It should be > 1 * control_peak_time * gst.SECOND + 4 * block_duration
619  #
620  # FIXME for an unknown reason there needs to be an extra large
621  # queue in this part of the pipeline in order to prevent
622  # lock-ups. Fortunately this is buffering up relatively
623  # lightweight data, i.e., before reconstruction
624  #
625  # FIXME since peakfinding is done, or control is based on
626  # injections only, we ignore the bank.gate_threshold parameter
627  # and just use 1e-100
628 
629  src = pipeparts.mkgate(
630  pipeline,
631  pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 1 * (2 * control_peak_time + (abs(gate_attack_length) + abs(gate_hold_length)) / bank_fragment.rate) * gst.SECOND + 12 * block_duration),
632  threshold = 1e-100,
633  attack_length = gate_attack_length,
634  hold_length = gate_hold_length,
635  control = pipeparts.mkqueue(pipeline, control_src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 1 * (2 * control_peak_time + (abs(gate_attack_length) + abs(gate_hold_length)) / bank_fragment.rate) * gst.SECOND + 12 * block_duration)
636  )
637  src = pipeparts.mkchecktimestamps(pipeline, src, "timestamps_%s_after_gate" % logname)
638  else:
639  #
640  # buffer orthogonal SNRs / or actual SNRs if SVD is not
641  # used. the queue upstream of the sum-of-squares gate
642  # plays this role if that feature has been enabled
643  #
644  # FIXME: teach the collectpads object not to wait for
645  # buf fers on pads whose segments have not yet been reached
646  # by the input on the other pads. then this large queue
647  # buffer will not be required because streaming can begin
648  # through the downstream adders without waiting for input
649  # from all upstream elements.
650 
651  src = pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 1 * block_duration)
652 
653  #
654  # reconstruct physical SNRs
655  #
656 
657  if bank_fragment.mix_matrix is not None:
658  src = pipeparts.mkmatrixmixer(pipeline, src, matrix = bank_fragment.mix_matrix)
659  src = pipeparts.mkchecktimestamps(pipeline, src, "timestamps_%s_after_matrixmixer" % logname)
660 
661  #
662  # done
663  #
664  # FIXME: find a way to use less memory without this hack
665 
666  del bank_fragment.orthogonal_template_bank
667  del bank_fragment.sum_of_squares_weights
668  del bank_fragment.mix_matrix
669 
670  return src
671 
672 
673 def mkLLOIDhoftToSnrSlices(pipeline, hoftdict, bank, control_snksrc, block_duration, verbose = False, logname = "", nxydump_segment = None, fir_stride = None, control_peak_time = None, snrslices = None):
674  """!
675  Build the pipeline fragment that creates the SnrSlices associated with
676  different sample rates from hoft.
677  """
678  #
679  # parameters
680  #
681 
682  rates = sorted(bank.get_rates())
683  output_rate = max(rates)
684 
685  # work out the upsample factors for the attack and hold calculations below
686  upsample_factor = dict((rate, rates[i+1] / rate) for i, rate in list(enumerate(rates))[:-1])
687  upsample_factor[output_rate] = 0
688 
689  autocorrelation_length = bank.autocorrelation_bank.shape[1]
690  assert autocorrelation_length % 2 == 1
691  autocorrelation_latency = -(autocorrelation_length - 1) / 2
692 
693  #
694  # loop over template bank slices
695  #
696 
697  branch_heads = dict((rate, set()) for rate in rates)
698  for bank_fragment in bank.bank_fragments:
699  # The attack and hold width has three parts
700  #
701  # 1) The audio resampler filter: 16 comes from the size of
702  # the audioresampler filter in samples for the next highest
703  # rate at quality 1. Note it must then be converted to the size
704  # at the current rate using the upsample_factor dictionary
705  # (which is 0 if you are at the max rate).
706  #
707  # 2) The chisq latency. You must have at least latency number
708  # of points before and after (at the maximum sample rate) to
709  # compute the chisq
710  #
711  # 3) A fudge factor to get the width of the peak. FIXME this
712  # is just set to 1/8th of a second
713  peak_half_width = upsample_factor[bank_fragment.rate] * 16 + int(math.ceil(-autocorrelation_latency * (float(bank_fragment.rate) / output_rate))) + int(math.ceil(bank_fragment.rate / 8.))
714  branch_heads[bank_fragment.rate].add(mkLLOIDbranch(
715  pipeline,
716  # FIXME: the size isn't ideal: the correct value
717  # depends on how much data is accumulated in the
718  # firbank element, and the value here is only
719  # approximate and not tied to the fir bank
720  # parameters so might not work if those change
721  pipeparts.mkqueue(pipeline, pipeparts.mkdrop(pipeline, hoftdict[bank_fragment.rate], int(round((bank.filter_length - bank_fragment.end) * bank_fragment.rate))), max_size_bytes = 0, max_size_buffers = 0, max_size_time = (1 * fir_stride + int(math.ceil(bank.filter_length))) * gst.SECOND),
722  bank,
723  bank_fragment,
724  control_snksrc,
725  peak_half_width,
726  peak_half_width,
727  block_duration,
728  nxydump_segment = nxydump_segment,
729  fir_stride = fir_stride,
730  control_peak_time = control_peak_time
731  ))
732 
733  #
734  # if the calling code has requested copies of the snr
735  # slices, sum together the highest sample rate streams and tee
736  # them off here. this needs to be done before constructing the
737  # adder network below in order to have access to this snr slice by
738  # itself. if we put this off until later it'll have all the other
739  # snrs added into it before we get a chance to tee it off
740  #
741 
742  if snrslices is not None:
743  rate, heads = output_rate, branch_heads[output_rate]
744  if len(heads) > 1:
745  #
746  # this sample rate has more than one snr stream.
747  # sum them together in an adder, which becomes the
748  # head of the stream at this sample rate
749  #
750 
751  branch_heads[rate] = pipeparts.mkadder(pipeline, (pipeparts.mkqueue(pipeline, head, max_size_bytes = 0, max_size_buffers = 0, max_size_time = block_duration) for head in heads))
752  else:
753  #
754  # this sample rate has only one stream. it's the
755  # head of the stream at this sample rate
756  #
757 
758  branch_heads[rate], = heads
759  branch_heads[rate] = pipeparts.mktee(pipeline, branch_heads[rate])
760  snrslices[rate] = pipeparts.mktogglecomplex(pipeline, branch_heads[rate])
761 
762  #
763  # the code below expects an interable of elements
764  #
765 
766  branch_heads[rate] = set([branch_heads[rate]])
767 
768  #
769  # sum the snr streams
770  #
771 
772  if True: # FIXME: make conditional on time-slice \chi^{2}
773  next_rate = dict(zip(rates, rates[1:]))
774  else:
775  next_rate = dict((rate, output_rate) for rate in rates if rate != output_rate)
776 
777  for rate, heads in sorted(branch_heads.items()):
778  if len(heads) > 1:
779  #
780  # this sample rate has more than one snr stream.
781  # sum them together in an adder, which becomes the
782  # head of the stream at this sample rate
783  #
784 
785  branch_heads[rate] = pipeparts.mkadder(pipeline, (pipeparts.mkqueue(pipeline, head, max_size_bytes = 0, max_size_buffers = 0, max_size_time = 1 * block_duration) for head in heads))
786  # FIXME capsfilter shouldn't be needed remove when adder is fixed
787  branch_heads[rate] = pipeparts.mkcapsfilter(pipeline, branch_heads[rate], "audio/x-raw-float, rate=%d" % rate)
788  branch_heads[rate] = pipeparts.mkchecktimestamps(pipeline, branch_heads[rate], "timestamps_%s_after_%d_snr_adder" % (logname, rate))
789  else:
790  #
791  # this sample rate has only one stream. it's the
792  # head of the stream at this sample rate
793  #
794 
795  branch_heads[rate], = heads
796 
797  #
798  # resample if needed
799  #
800 
801  if rate in next_rate:
802  # Note quality = 1 requires that the template slices
803  # are padded such that the Nyquist frequency is 1.5
804  # times the highest frequency of the time slice
805  branch_heads[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkresample(pipeline, branch_heads[rate], quality = 1), "audio/x-raw-float, rate=%d" % next_rate[rate])
806  branch_heads[rate] = pipeparts.mknofakedisconts(pipeline, branch_heads[rate]) # FIXME: remove when resampler is patched
807  branch_heads[rate] = pipeparts.mkchecktimestamps(pipeline, branch_heads[rate], "timestamps_%s_after_%d_to_%d_snr_resampler" % (logname, rate, next_rate[rate]))
808 
809  #
810  # if the calling code has requested copies of the snr
811  # slices, tee that off here. remember that we've already
812  # got the highest sample rate slice from above
813  #
814 
815  if snrslices is not None and rate != output_rate:
816  branch_heads[rate] = pipeparts.mktee(pipeline, branch_heads[rate])
817  snrslices[rate] = pipeparts.mktogglecomplex(pipeline, branch_heads[rate])
818 
819  #
820  # chain to next adder if this isn't the final answer
821  #
822 
823  if rate in next_rate:
824  branch_heads[next_rate[rate]].add(branch_heads.pop(rate))
825 
826  #
827  # done
828  #
829 
830  snr, = branch_heads.values() # make sure we've summed down to one stream
831  return pipeparts.mktogglecomplex(pipeline, snr)
832  #return pipeparts.mkcapsfilter(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkcapsfilter(pipeline, snr, "audio/x-raw-float, rate=%d" % output_rate)), "audio/x-raw-complex, rate=%d" % output_rate)
833 
834 
835 def mkLLOIDSnrSlicesToTimeSliceChisq(pipeline, branch_heads, bank, block_duration):
836  """!
837  Build pipeline fragment that computes the TimeSliceChisq from SnrSlices.
838  """
839  #
840  # parameters
841  #
842 
843  rates = sorted(bank.get_rates())
844 
845  #
846  # compute the chifacs for each rate, store in ascending order in rate
847  #
848 
849  chifacsdict = dict((rate, []) for rate in rates)
850  for bank_fragment in bank.bank_fragments:
851  chifacsdict[bank_fragment.rate].append(bank_fragment.chifacs)
852  chifacs = []
853  for rate, facs in sorted(chifacsdict.items()):
854  chifacs.append(facs[0][0::2])
855  chifacs[-1] += facs[0][1::2]
856  for fac in facs[1:]:
857  chifacs[-1] += fac[0::2]
858  chifacs[-1] += fac[1::2]
859  chifacs[-1] /= 2.
860 
861  #
862  # create timeslicechisq element and add chifacs as a property
863  #
864 
865  chisq = gst.element_factory_make("lal_timeslicechisq")
866  pipeline.add(chisq)
867 
868  #
869  # link the snrslices to the timeslicechisq element in ascending order in rate
870  #
871 
872  for rate, snrslice in sorted(branch_heads.items()):
873  pipeparts.mkqueue(pipeline, snrslice, max_size_bytes = 0, max_size_buffers = 0, max_size_time = block_duration).link(chisq)
874 
875  #
876  # set chifacs-matrix property, needs to be done after snrslices are linked in
877  #
878 
879  chisq.set_property("chifacs-matrix", chifacs)
880 
881  return pipeparts.mkqueue(pipeline, chisq, max_size_bytes = 0, max_size_buffers = 0, max_size_time = block_duration)
882 
883 
884 def mkLLOIDSnrChisqToTriggers(pipeline, snr, chisq, bank, verbose = False, nxydump_segment = None, logname = ""):
885  """!
886  Build pipeline fragment that converts single detector SNR and Chisq
887  into triggers.
888  """
889  #
890  # trigger generator and progress report
891  #
892 
893  head = pipeparts.mktriggergen(pipeline, snr, chisq, template_bank_filename = bank.template_bank_filename, snr_threshold = bank.snr_threshold, sigmasq = bank.sigmasq)
894  # FIXME: add ability to choose this
895  # "lal_blcbctriggergen", {"bank-filename": bank.template_bank_filename, "snr-thresh": bank.snr_threshold, "sigmasq": bank.sigmasq}
896  if verbose:
897  head = pipeparts.mkprogressreport(pipeline, head, "progress_xml_%s" % logname)
898 
899  #
900  # done
901  #
902 
903  return head
904 
905 
906 #
907 # many instruments, many template banks
908 #
909 
910 
911 def mkLLOIDmulti(pipeline, detectors, banks, psd, psd_fft_length = 8, ht_gate_threshold = float("inf"), veto_segments = None, verbose = False, nxydump_segment = None, chisq_type = 'autochisq', track_psd = False, fir_stride = 16, control_peak_time = 2, block_duration = gst.SECOND, reconstruction_segment_list = None):
912  """!
913  The multiple instrument, multiple bank LLOID algorithm
914  """
915 
916  #
917  # check for unrecognized chisq_types, non-unique bank IDs
918  #
919 
920  if chisq_type not in ['autochisq', 'timeslicechisq']:
921  raise ValueError("chisq_type must be either 'autochisq' or 'timeslicechisq', given %s" % chisq_type)
922  if any(tuple(iterutils.nonuniq(bank.bank_id for bank in banklist)) for banklist in banks.values()):
923  raise ValueError("bank IDs are not unique: %s" % "; ".join("for %s: %s" % (instrument, iterutils.nonuniq(bank.bank_id for bank in banklist)) for instrument, banklist in banks.items()))
924 
925  #
926  # construct dictionaries of whitened, conditioned, down-sampled
927  # h(t) streams
928  #
929 
930  hoftdicts = {}
931  for instrument in detectors.channel_dict:
932  rates = set(rate for bank in banks[instrument] for rate in bank.get_rates())
933  src = datasource.mkbasicsrc(pipeline, detectors, instrument, verbose)
934  hoftdicts[instrument] = multirate_datasource.mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = psd[instrument], psd_fft_length = psd_fft_length, ht_gate_threshold = ht_gate_threshold, veto_segments = veto_segments[instrument] if veto_segments is not None else None, seekevent = detectors.seekevent, nxydump_segment = nxydump_segment, track_psd = track_psd, zero_pad = 0, width = 32)
935 
936  #
937  # build gate control branches
938  #
939 
940  if control_peak_time > 0 or reconstruction_segment_list is not None:
941  control_branch = {}
942  for instrument, bank in [(instrument, bank) for instrument, banklist in banks.items() for bank in banklist]:
943  suffix = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or ""))
944  if instrument != "H2":
945  control_branch[(instrument, bank.bank_id)] = mkcontrolsnksrc(pipeline, max(bank.get_rates()), verbose = verbose, suffix = suffix, reconstruction_segment_list = reconstruction_segment_list, seekevent = detectors.seekevent, control_peak_samples = control_peak_time * max(bank.get_rates()))
946  #pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control_branch[(instrument, bank.bank_id)][1]), "control_%s.dump" % suffix, segment = nxydump_segment)
947 
948  else:
949  control_branch = None
950  #
951  # construct trigger generators
952  #
953 
954  triggersrcs = dict((instrument, set()) for instrument in hoftdicts)
955  for instrument, bank in [(instrument, bank) for instrument, banklist in banks.items() for bank in banklist]:
956  suffix = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or ""))
957  if control_branch is not None:
958  if instrument != "H2":
959  control_snksrc = control_branch[(instrument, bank.bank_id)]
960  else:
961  control_snksrc = (None, control_branch[("H1", bank.bank_id)][1])
962  else:
963  control_snksrc = (None, None)
964  if chisq_type == 'timeslicechisq':
965  snrslices = {}
966  else:
967  snrslices = None
969  pipeline,
970  hoftdicts[instrument],
971  bank,
972  control_snksrc,
973  block_duration,
974  verbose = verbose,
975  logname = suffix,
976  nxydump_segment = nxydump_segment,
977  control_peak_time = control_peak_time,
978  fir_stride = fir_stride,
979  snrslices = snrslices
980  )
981  snr = pipeparts.mkchecktimestamps(pipeline, snr, "timestamps_%s_snr" % suffix)
982  # uncomment this tee if the diagnostic sinks below are
983  # needed
984  #snr = pipeparts.mktee(pipeline, snr)
985  if chisq_type == 'autochisq':
986  # FIXME don't hardcode
987  # peak finding window (n) in samples is 1 second at max rate, ie max(rates)
988  head = pipeparts.mkitac(pipeline, snr, 1 * max(rates), bank.template_bank_filename, autocorrelation_matrix = bank.autocorrelation_bank, mask_matrix = bank.autocorrelation_mask, snr_thresh = bank.snr_threshold, sigmasq = bank.sigmasq)
989  if verbose:
990  head = pipeparts.mkprogressreport(pipeline, head, "progress_xml_%s" % suffix)
991  triggersrcs[instrument].add(head)
992  else:
993  raise NotImplementedError("Currently only 'autochisq' is supported")
994  # FIXME: find a way to use less memory without this hack
995  del bank.autocorrelation_bank
996  #pipeparts.mknxydumpsink(pipeline, pipeparts.mktogglecomplex(pipeline, pipeparts.mkqueue(pipeline, snr)), "snr_%s.dump" % suffix, segment = nxydump_segment)
997  #pipeparts.mkogmvideosink(pipeline, pipeparts.mkcapsfilter(pipeline, pipeparts.mkchannelgram(pipeline, pipeparts.mkqueue(pipeline, snr), plot_width = .125), "video/x-raw-rgb, width=640, height=480, framerate=64/1"), "snr_channelgram_%s.ogv" % suffix, audiosrc = pipeparts.mkaudioamplify(pipeline, pipeparts.mkqueue(pipeline, hoftdict[max(bank.get_rates())], max_size_time = 2 * int(math.ceil(bank.filter_length)) * gst.SECOND), 0.125), verbose = True)
998 
999  #
1000  # done
1001  #
1002 
1003  assert any(triggersrcs.values())
1004  return triggersrcs
1005