81 gobject.threads_init()
87 from glue
import iterutils
88 from glue
import segments
89 from glue.ligolw
import ligolw
90 from glue.ligolw
import utils
as ligolw_utils
91 from glue.ligolw.utils
import segments
as ligolw_segments
92 from glue.ligolw.utils
import process
as ligolw_process
93 from gstlal
import bottle
94 from gstlal
import datasource
95 from gstlal
import multirate_datasource
96 from gstlal
import pipeio
97 from gstlal
import pipeparts
98 from gstlal
import simplehandler
99 from pylal
import series
as lalseries
100 from pylal.datatypes
import LIGOTimeGPS
144 def mkcontrolsnksrc(pipeline, rate, verbose = False, suffix = None, reconstruction_segment_list = None, seekevent = None, control_peak_samples = None):
146 This function implements a portion of a gstreamer graph to provide a
147 control signal for deciding when to reconstruct physical SNRS
149 @param pipeline A reference to the gstreamer pipeline in which to add this graph
150 @param rate An integer representing the target sample rate of the resulting src
151 @param verbose Make verbose
152 @param suffix Log name for verbosity
153 @param reconstruction_segment_list A segment list object that describes when the control signal should be on. This can be useful in e.g., only reconstructing physical SNRS around the time of injections, which can save an enormous amount of CPU time.
154 @param seekevent A seek event such as what would be created by seek_event_for_gps()
155 @param control_peak_samples If nonzero, this would do peakfinding on the control signal with the window specified by this parameter. The peak finding would give a single sample of "on" state at the peak. This will cause far less CPU to be used if you only want to reconstruct SNR around the peak of the control signal.
161 snk = pipeparts.mkadder(pipeline,
None)
162 src = pipeparts.mkcapsfilter(pipeline, snk,
"audio/x-raw-float, rate=%d" % rate)
168 if control_peak_samples > 0:
169 src = pipeparts.mkpeak(pipeline, src, control_peak_samples)
179 if reconstruction_segment_list
is not None:
180 src = datasource.mksegmentsrcgate(pipeline, src, reconstruction_segment_list, seekevent = seekevent, invert_output =
False)
186 logname = suffix
and "_%s" % suffix
or ""
188 src = pipeparts.mkprogressreport(pipeline, src,
"progress_sumsquares%s" % logname)
189 src = pipeparts.mkchecktimestamps(pipeline, src,
"timestamps%s_sumsquares" % logname)
190 src = pipeparts.mktee(pipeline, src)
201 A subclass of simplehandler.Handler to be used with e.g.,
204 Implements additional message handling for dealing with spectrum
205 messages and checkpoints for the online analysis including periodic
206 dumps of segment information, trigger files and background
207 distribution statistics.
209 def __init__(self, mainloop, pipeline, dataclass, instruments, tag = "", seglistdict = None, verbose = False):
211 @param mainloop The main application's event loop
212 @param pipeline The gstreamer pipeline that is being controlled by this handler
213 @param dataclass An inspiral.Data class instance
214 @param tag The tag to use for naming file snapshots, e.g. the description will be "%s_LLOID" % tag
215 @param verbose Be verbose
217 super(Handler, self).
__init__(mainloop, pipeline)
243 "statevectorsegments":
"state_vector_gate",
244 "whitehtsegments":
"ht_gate"
249 self.
seglistdicts = dict((segtype, segments.segmentlistdict((instrument, segments.segmentlist())
for instrument
in instruments))
for segtype
in gate_suffix)
251 if seglistdict
is None:
252 self.
seglistdicts[
"triggersegments"] = segments.segmentlistdict((instrument, segments.segmentlist())
for instrument
in instruments)
267 print >>sys.stderr,
"connecting segment handlers to gates ..."
268 for segtype, seglistdict
in self.seglistdicts.items():
269 for instrument
in seglistdict:
271 name =
"%s_%s" % (instrument, gate_suffix[segtype])
276 elem = self.pipeline.get_by_name(name)
280 print >>sys.stderr,
"\tcould not find %s for %s '%s'" % (name, instrument, segtype)
283 print >>sys.stderr,
"\tfound %s for %s '%s'" % (name, instrument, segtype)
284 elem.connect(
"start", self.
gatehandler, (segtype, instrument,
"on"))
285 elem.connect(
"stop", self.
gatehandler, (segtype, instrument,
"off"))
286 elem.set_property(
"emit-signals",
True)
288 print >>sys.stderr,
"... done connecting segment handlers to gates"
299 Override the on_message method of simplehandler to handle
300 additional message types, e.g., spectrum and checkpointing messages.
302 @param bus A reference to the pipeline's bus
303 @param message A reference to the incoming message
305 if message.type == gst.MESSAGE_ELEMENT:
306 if message.structure.get_name() ==
"spectrum":
311 instrument = message.src.get_name().split(
"_")[-1]
312 psd = pipeio.parse_spectrum_message(message)
313 timestamp = psd.epoch
316 self.
psds[instrument] = psd
322 self.dataclass.record_horizon_distance(instrument, timestamp, psd, m1 = 1.4, m2 = 1.4)
324 elif message.type == gst.MESSAGE_APPLICATION:
325 if message.structure.get_name() ==
"CHECKPOINT":
327 timestamp = message.structure[
"timestamp"]
330 elif message.type == gst.MESSAGE_EOS:
331 with self.dataclass.lock:
334 timestamp = self.
seglistdicts[
"triggersegments"].extent_all()[1].ns()
342 def _close_segments(self, timestamp):
345 for (segtype, instrument), start_time
in list(self.current_segment_start.items()):
346 if timestamp < start_time.ns():
351 self.
_gatehandler(
None, timestamp, (segtype, instrument,
"off"))
353 self.
_gatehandler(
None, timestamp, (segtype, instrument,
"on"))
357 Record stop times for all open segments and start new ones.
359 @param timestamp the time in nanoseconds at which to mark
360 the boundary. If this preceeds and open segment's start
361 time, that segment is not closed.
363 with self.dataclass.lock:
368 Checkpoint, e.g., flush segments and triggers to disk.
370 @param timestamp the gstreamer timestamp in nanoseconds of the current buffer in order to close off open segment intervals before writing to disk
374 self.dataclass.snapshot_output_file(
"%s_LLOID" % self.
tag,
"xml.gz", verbose = self.
verbose)
375 except TypeError
as te:
376 print >>sys.stderr,
"Warning: couldn't build output file on checkpoint, probably there aren't any triggers: %s" % te
380 Flush segments to disk, e.g., when checkpointing or shutting
381 down an online pipeline.
383 @param timestamp the gstreamer timestamp in nanoseconds of the current buffer in order to close off open segment intervals before writing to disk
385 with self.dataclass.lock:
389 ext = segments.segmentlist(seglistdict.extent_all()
for seglistdict
in self.seglistdicts.values()).extent()
390 instruments = set(instrument
for seglistdict
in self.seglistdicts.values()
for instrument
in seglistdict)
392 path = str(int(math.floor(ext[0])))[:5]
397 fname =
"%s/%s-%s_SEGMENTS-%d-%d.xml.gz" % (path,
"".join(sorted(instruments)), self.
tag, int(math.floor(ext[0])), int(math.ceil(ext[1])) - int(math.floor(ext[0])))
398 ligolw_utils.write_filename(self.
gen_segments_xmldoc(), fname, gz = fname.endswith(
'.gz'), verbose = self.
verbose, trap_signals =
None)
401 for segtype, seglistdict
in self.seglistdicts.items():
410 if segtype ==
"triggersegments":
412 for seglist
in seglistdict.values():
415 print >>sys.stderr,
"Warning: couldn't build segment list on checkpoint, probably there aren't any segments"
417 def _gatehandler(self, elem, timestamp, (segtype, instrument, new_state)):
418 timestamp = LIGOTimeGPS(0, timestamp)
419 state_key = (segtype, instrument)
421 if self.
verbose and elem
is not None:
422 print >>sys.stderr,
"%s: %s '%s' state transition: %s @ %s" % (elem.get_name(), instrument, segtype, new_state, str(timestamp))
427 self.
seglistdicts[segtype][instrument] |= segments.segmentlist((segments.segment(self.current_segment_start.pop(state_key), timestamp),))
428 if new_state ==
"on":
431 assert new_state ==
"off"
433 def gatehandler(self, elem, timestamp, (segtype, instrument, new_state)):
435 A handler that intercepts gate state transitions. This can set
436 the "on" segments for each detector
438 @param elem A reference to the lal_gate element or None (only used for verbosity)
439 @param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds)
440 @param segtype the class of segments this gate is defining, e.g., "datasegments", etc..
441 @param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc..
442 @param new_state the state transition, must be either "on" or "off"
444 with self.dataclass.lock:
445 self.
_gatehandler(elem, timestamp, (segtype, instrument, new_state))
449 A method to output the segment list in a valid ligolw xml
452 xmldoc = ligolw.Document()
453 xmldoc.appendChild(ligolw.LIGO_LW())
454 process = ligolw_process.register_to_xmldoc(xmldoc,
"gstlal_inspiral", {})
455 with ligolw_segments.LigolwSegments(xmldoc, process)
as ligolwsegments:
456 for segtype, seglistdict
in self.seglistdicts.items():
457 ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment =
"LLOID snapshot")
458 ligolw_process.set_process_end_time(process)
463 provide a bottle route to get segment information via a url
465 with self.dataclass.lock:
466 output = StringIO.StringIO()
468 outstr = output.getvalue()
472 def gen_psd_xmldoc(self):
473 xmldoc = lalseries.make_psd_xmldoc(self.
psds)
474 process = ligolw_process.register_to_xmldoc(xmldoc,
"gstlal_inspiral", {})
475 ligolw_process.set_process_end_time(process)
478 def web_get_psd_xml(self):
479 with self.dataclass.lock:
480 output = StringIO.StringIO()
481 ligolw_utils.write_fileobj(self.
gen_psd_xmldoc(), output, trap_signals =
None)
482 outstr = output.getvalue()
548 def mkLLOIDbranch(pipeline, src, bank, bank_fragment, (control_snk, control_src), gate_attack_length, gate_hold_length, block_duration, nxydump_segment =
None, fir_stride =
None, control_peak_time =
None):
550 Make a single slice of one branch of the lloid graph, e.g. one instrument and one
551 template bank fragment. For details see: http://arxiv.org/abs/1107.2665
553 Specifically this implements the filtering of multirate svd basis and
554 (conditional) resampling and reconstruction of the physical SNR
556 @param pipeline The gstreamer pipeline in which to place this graph
557 @param src The source of data for this graph provided by a gstreamer element
558 @param bank The template bank class
559 @param bank_fragment The specific fragment (time slice) of the template bank in question
560 @param (control_snk, control_src) An optional tuple of the sink and source elements for a graph that will construct a control time series for the gate which aggregates the orthogonal snrs from each template slice. This is used to conditionally reconstruct the physical SNR of interesting times
561 @param gate_attack_length The attack length in samples for the lal_gate element that controls the reconstruction of physical SNRs
562 @param gate_hold_length The hold length in samples for the lal_gate element that controls the reconstruction of physical SNRs
563 @param block_duration The characteristic buffer size that is passed around, which is useful for constructing queues.
564 @param nxydump_segment Not used
565 @param fir_stride The target length of output buffers from lal_firbank. Directly effects latency. Making this short will force time-domain convolution. Otherwise FFT convolution will be done to save CPU cycles, but at higher latency.
566 @param control_peak_time The window over which to find peaks in the control signal. Shorter windows increase computational cost but probably also detection efficiency.
568 logname =
"%s_%.2f.%.2f" % (bank.logname, bank_fragment.start, bank_fragment.end)
576 latency = -int(round(bank_fragment.start * bank_fragment.rate))
577 block_stride = fir_stride * bank_fragment.rate
583 time_domain = 5 * numpy.log2((bank_fragment.end - bank_fragment.start) * bank_fragment.rate + block_stride) > block_stride
585 src = pipeparts.mkfirbank(pipeline, src, latency = latency, fir_matrix = bank_fragment.orthogonal_template_bank, block_stride = block_stride, time_domain = time_domain)
586 src = pipeparts.mkchecktimestamps(pipeline, src,
"timestamps_%s_after_firbank" % logname)
601 if control_snk
is not None and control_src
is not None:
602 src = pipeparts.mktee(pipeline, src)
603 elem = pipeparts.mkqueue(pipeline, pipeparts.mksumsquares(pipeline, src, weights = bank_fragment.sum_of_squares_weights), max_size_buffers = 0, max_size_bytes = 0, max_size_time = block_duration)
604 elem = pipeparts.mkchecktimestamps(pipeline, elem,
"timestamps_%s_after_sumsquare" % logname)
608 elem = pipeparts.mkcapsfilter(pipeline, pipeparts.mkresample(pipeline, elem, quality = 9),
"audio/x-raw-float, rate=%d" % max(bank.get_rates()))
609 elem = pipeparts.mknofakedisconts(pipeline, elem)
610 elem = pipeparts.mkchecktimestamps(pipeline, elem,
"timestamps_%s_after_sumsquare_resampler" % logname)
611 elem.link(control_snk)
629 src = pipeparts.mkgate(
631 pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 1 * (2 * control_peak_time + (abs(gate_attack_length) + abs(gate_hold_length)) / bank_fragment.rate) * gst.SECOND + 12 * block_duration),
633 attack_length = gate_attack_length,
634 hold_length = gate_hold_length,
635 control = pipeparts.mkqueue(pipeline, control_src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 1 * (2 * control_peak_time + (abs(gate_attack_length) + abs(gate_hold_length)) / bank_fragment.rate) * gst.SECOND + 12 * block_duration)
637 src = pipeparts.mkchecktimestamps(pipeline, src,
"timestamps_%s_after_gate" % logname)
651 src = pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 1 * block_duration)
657 if bank_fragment.mix_matrix
is not None:
658 src = pipeparts.mkmatrixmixer(pipeline, src, matrix = bank_fragment.mix_matrix)
659 src = pipeparts.mkchecktimestamps(pipeline, src,
"timestamps_%s_after_matrixmixer" % logname)
666 del bank_fragment.orthogonal_template_bank
667 del bank_fragment.sum_of_squares_weights
668 del bank_fragment.mix_matrix
673 def mkLLOIDhoftToSnrSlices(pipeline, hoftdict, bank, control_snksrc, block_duration, verbose = False, logname = "", nxydump_segment = None, fir_stride = None, control_peak_time = None, snrslices = None):
675 Build the pipeline fragment that creates the SnrSlices associated with
676 different sample rates from hoft.
682 rates = sorted(bank.get_rates())
683 output_rate = max(rates)
686 upsample_factor = dict((rate, rates[i+1] / rate)
for i, rate
in list(enumerate(rates))[:-1])
687 upsample_factor[output_rate] = 0
689 autocorrelation_length = bank.autocorrelation_bank.shape[1]
690 assert autocorrelation_length % 2 == 1
691 autocorrelation_latency = -(autocorrelation_length - 1) / 2
697 branch_heads = dict((rate, set())
for rate
in rates)
698 for bank_fragment
in bank.bank_fragments:
713 peak_half_width = upsample_factor[bank_fragment.rate] * 16 + int(math.ceil(-autocorrelation_latency * (float(bank_fragment.rate) / output_rate))) + int(math.ceil(bank_fragment.rate / 8.))
721 pipeparts.mkqueue(pipeline, pipeparts.mkdrop(pipeline, hoftdict[bank_fragment.rate], int(round((bank.filter_length - bank_fragment.end) * bank_fragment.rate))), max_size_bytes = 0, max_size_buffers = 0, max_size_time = (1 * fir_stride + int(math.ceil(bank.filter_length))) * gst.SECOND),
728 nxydump_segment = nxydump_segment,
729 fir_stride = fir_stride,
730 control_peak_time = control_peak_time
742 if snrslices
is not None:
743 rate, heads = output_rate, branch_heads[output_rate]
751 branch_heads[rate] = pipeparts.mkadder(pipeline, (pipeparts.mkqueue(pipeline, head, max_size_bytes = 0, max_size_buffers = 0, max_size_time = block_duration)
for head
in heads))
758 branch_heads[rate], = heads
759 branch_heads[rate] = pipeparts.mktee(pipeline, branch_heads[rate])
760 snrslices[rate] = pipeparts.mktogglecomplex(pipeline, branch_heads[rate])
766 branch_heads[rate] = set([branch_heads[rate]])
773 next_rate = dict(zip(rates, rates[1:]))
775 next_rate = dict((rate, output_rate)
for rate
in rates
if rate != output_rate)
777 for rate, heads
in sorted(branch_heads.items()):
785 branch_heads[rate] = pipeparts.mkadder(pipeline, (pipeparts.mkqueue(pipeline, head, max_size_bytes = 0, max_size_buffers = 0, max_size_time = 1 * block_duration)
for head
in heads))
787 branch_heads[rate] = pipeparts.mkcapsfilter(pipeline, branch_heads[rate],
"audio/x-raw-float, rate=%d" % rate)
788 branch_heads[rate] = pipeparts.mkchecktimestamps(pipeline, branch_heads[rate],
"timestamps_%s_after_%d_snr_adder" % (logname, rate))
795 branch_heads[rate], = heads
801 if rate
in next_rate:
805 branch_heads[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkresample(pipeline, branch_heads[rate], quality = 1),
"audio/x-raw-float, rate=%d" % next_rate[rate])
806 branch_heads[rate] = pipeparts.mknofakedisconts(pipeline, branch_heads[rate])
807 branch_heads[rate] = pipeparts.mkchecktimestamps(pipeline, branch_heads[rate],
"timestamps_%s_after_%d_to_%d_snr_resampler" % (logname, rate, next_rate[rate]))
815 if snrslices
is not None and rate != output_rate:
816 branch_heads[rate] = pipeparts.mktee(pipeline, branch_heads[rate])
817 snrslices[rate] = pipeparts.mktogglecomplex(pipeline, branch_heads[rate])
823 if rate
in next_rate:
824 branch_heads[next_rate[rate]].add(branch_heads.pop(rate))
830 snr, = branch_heads.values()
831 return pipeparts.mktogglecomplex(pipeline, snr)
837 Build pipeline fragment that computes the TimeSliceChisq from SnrSlices.
843 rates = sorted(bank.get_rates())
849 chifacsdict = dict((rate, [])
for rate
in rates)
850 for bank_fragment
in bank.bank_fragments:
851 chifacsdict[bank_fragment.rate].append(bank_fragment.chifacs)
853 for rate, facs
in sorted(chifacsdict.items()):
854 chifacs.append(facs[0][0::2])
855 chifacs[-1] += facs[0][1::2]
857 chifacs[-1] += fac[0::2]
858 chifacs[-1] += fac[1::2]
865 chisq = gst.element_factory_make(
"lal_timeslicechisq")
872 for rate, snrslice
in sorted(branch_heads.items()):
873 pipeparts.mkqueue(pipeline, snrslice, max_size_bytes = 0, max_size_buffers = 0, max_size_time = block_duration).link(chisq)
879 chisq.set_property(
"chifacs-matrix", chifacs)
881 return pipeparts.mkqueue(pipeline, chisq, max_size_bytes = 0, max_size_buffers = 0, max_size_time = block_duration)
886 Build pipeline fragment that converts single detector SNR and Chisq
893 head = pipeparts.mktriggergen(pipeline, snr, chisq, template_bank_filename = bank.template_bank_filename, snr_threshold = bank.snr_threshold, sigmasq = bank.sigmasq)
897 head = pipeparts.mkprogressreport(pipeline, head,
"progress_xml_%s" % logname)
911 def mkLLOIDmulti(pipeline, detectors, banks, psd, psd_fft_length = 8, ht_gate_threshold = float(
"inf"), veto_segments =
None, verbose =
False, nxydump_segment =
None, chisq_type =
'autochisq', track_psd =
False, fir_stride = 16, control_peak_time = 2, block_duration = gst.SECOND, reconstruction_segment_list =
None):
913 The multiple instrument, multiple bank LLOID algorithm
920 if chisq_type
not in [
'autochisq',
'timeslicechisq']:
921 raise ValueError(
"chisq_type must be either 'autochisq' or 'timeslicechisq', given %s" % chisq_type)
922 if any(tuple(iterutils.nonuniq(bank.bank_id
for bank
in banklist))
for banklist
in banks.values()):
923 raise ValueError(
"bank IDs are not unique: %s" %
"; ".join(
"for %s: %s" % (instrument, iterutils.nonuniq(bank.bank_id
for bank
in banklist))
for instrument, banklist
in banks.items()))
931 for instrument
in detectors.channel_dict:
932 rates = set(rate
for bank
in banks[instrument]
for rate
in bank.get_rates())
933 src = datasource.mkbasicsrc(pipeline, detectors, instrument, verbose)
934 hoftdicts[instrument] = multirate_datasource.mkwhitened_multirate_src(pipeline, src, rates, instrument, psd = psd[instrument], psd_fft_length = psd_fft_length, ht_gate_threshold = ht_gate_threshold, veto_segments = veto_segments[instrument]
if veto_segments
is not None else None, seekevent = detectors.seekevent, nxydump_segment = nxydump_segment, track_psd = track_psd, zero_pad = 0, width = 32)
940 if control_peak_time > 0
or reconstruction_segment_list
is not None:
942 for instrument, bank
in [(instrument, bank)
for instrument, banklist
in banks.items()
for bank
in banklist]:
943 suffix =
"%s%s" % (instrument, (bank.logname
and "_%s" % bank.logname
or ""))
944 if instrument !=
"H2":
945 control_branch[(instrument, bank.bank_id)] =
mkcontrolsnksrc(pipeline, max(bank.get_rates()), verbose = verbose, suffix = suffix, reconstruction_segment_list = reconstruction_segment_list, seekevent = detectors.seekevent, control_peak_samples = control_peak_time * max(bank.get_rates()))
949 control_branch =
None
954 triggersrcs = dict((instrument, set())
for instrument
in hoftdicts)
955 for instrument, bank
in [(instrument, bank)
for instrument, banklist
in banks.items()
for bank
in banklist]:
956 suffix =
"%s%s" % (instrument, (bank.logname
and "_%s" % bank.logname
or ""))
957 if control_branch
is not None:
958 if instrument !=
"H2":
959 control_snksrc = control_branch[(instrument, bank.bank_id)]
961 control_snksrc = (
None, control_branch[(
"H1", bank.bank_id)][1])
963 control_snksrc = (
None,
None)
964 if chisq_type ==
'timeslicechisq':
970 hoftdicts[instrument],
976 nxydump_segment = nxydump_segment,
977 control_peak_time = control_peak_time,
978 fir_stride = fir_stride,
979 snrslices = snrslices
981 snr = pipeparts.mkchecktimestamps(pipeline, snr,
"timestamps_%s_snr" % suffix)
985 if chisq_type ==
'autochisq':
988 head = pipeparts.mkitac(pipeline, snr, 1 * max(rates), bank.template_bank_filename, autocorrelation_matrix = bank.autocorrelation_bank, mask_matrix = bank.autocorrelation_mask, snr_thresh = bank.snr_threshold, sigmasq = bank.sigmasq)
990 head = pipeparts.mkprogressreport(pipeline, head,
"progress_xml_%s" % suffix)
991 triggersrcs[instrument].add(head)
993 raise NotImplementedError(
"Currently only 'autochisq' is supported")
995 del bank.autocorrelation_bank
1003 assert any(triggersrcs.values())