gstlal  0.8.1
 All Classes Namespaces Files Functions Variables Pages
__init__.py
Go to the documentation of this file.
1 # Copyright (C) 2009--2013 LIGO Scientific Collaboration
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 
27 import math
28 import os
29 import sys
30 import threading
31 
32 
33 import pygtk
34 pygtk.require("2.0")
35 import gobject
36 gobject.threads_init()
37 import pygst
38 pygst.require('0.10')
39 import gst
40 
41 
42 from glue import iterutils
43 from glue import lal
44 from glue import segments
45 from gstlal import pipeio
46 from pylal.xlal.datatypes.ligotimegps import LIGOTimeGPS
47 
48 
49 __author__ = "Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>, Drew Keppel <drew.keppel@ligo.org>"
50 __version__ = "FIXME"
51 __date__ = "FIXME"
52 
53 
54 ##
55 # @file
56 #
57 # A file that contains the pipeparts module code
58 #
59 
60 ##
61 # @package python.pipeparts
62 #
63 # pipeparts module
64 
65 
66 #
67 # =============================================================================
68 #
69 # Generic Constructors
70 #
71 # =============================================================================
72 #
73 
74 
75 #
76 # Applications should use the element-specific wrappings that follow below.
77 # The generic constructors are only intended to simplify the writing of
78 # those wrappings, they are not meant to be how applications create
79 # elements in pipelines.
80 #
81 
82 
83 def mkgeneric(pipeline, src, elem_type_name, **properties):
84  if "name" in properties:
85  elem = gst.element_factory_make(elem_type_name, properties.pop("name"))
86  else:
87  elem = gst.element_factory_make(elem_type_name)
88  for name, value in properties.items():
89  elem.set_property(name.replace("_", "-"), value)
90  pipeline.add(elem)
91  if isinstance(src, gst.Pad):
92  src.get_parent_element().link_pads(src, elem, None)
93  elif src is not None:
94  src.link(elem)
95  return elem
96 
97 
98 #
99 # deferred link helper
100 #
101 
102 
103 class src_deferred_link(object):
104  """!
105  A class that manages the task of watching for and connecting to new
106  source pads by name. The inputs are an element, the name of the
107  source pad to watch for on that element, and the sink pad (on a
108  different element) to which the source pad should be linked when it
109  appears.
110 
111  The "pad-added" signal of the element will be used to watch for new
112  pads, and if the "no-more-pads" signal is emitted by the element
113  before the requested pad has appeared ValueException is raised.
114  """
115  def __init__(self, element, srcpadname, sinkpad):
116  no_more_pads_handler_id = element.connect("no-more-pads", self.no_more_pads, srcpadname)
117  assert no_more_pads_handler_id > 0
118  pad_added_data = [srcpadname, sinkpad, no_more_pads_handler_id]
119  pad_added_handler_id = element.connect("pad-added", self.pad_added, pad_added_data)
120  assert pad_added_handler_id > 0
121  pad_added_data.append(pad_added_handler_id)
122 
123  @staticmethod
124  def pad_added(element, pad, (srcpadname, sinkpad, no_more_pads_handler_id, pad_added_handler_id)):
125  if pad.get_name() == srcpadname:
126  element.handler_disconnect(no_more_pads_handler_id)
127  element.handler_disconnect(pad_added_handler_id)
128  pad.link(sinkpad)
129 
130  @staticmethod
131  def no_more_pads(element, srcpadname):
132  raise ValueError("<%s>: no pad named '%s'" % (element.get_name(), srcpadname))
133 
134 
135 #
136 # framecpp channeldemux helpers
137 #
138 
139 
141  def __init__(self, elem, units_dict):
142  """
143  Connect a handler for the pad-added signal of the
144  framecpp_channeldemux element elem, and when a pad is added
145  to the element if the pad's name appears as a key in the
146  units_dict dictionary that pad's units property will be set
147  to the string value associated with that key in the
148  dictionary.
149 
150  Example:
151 
152  >>> framecpp_channeldemux_set_units(elem, {"H1:LSC-STRAIN": "strain"})
153 
154  NOTE: this is a work-around to address the problem that
155  most (all?) frame files do not have units set on their
156  channel data, whereas downstream consumers of the data
157  might require information about the units. The demuxer
158  provides the units as part of a tag event, and
159  framecpp_channeldemux_set_units() can be used to override
160  the values, thereby correcting absent or incorrect units
161  information.
162  """
163  self.elem = elem
164  self.pad_added_handler_id = elem.connect("pad-added", self.pad_added, units_dict)
165 
166  @staticmethod
167  def pad_added(element, pad, units_dict):
168  name = pad.get_name()
169  if name in units_dict:
170  pad.set_property("units", units_dict[name])
171 
172 
174  def __init__(self, elem, seglists, jitter = LIGOTimeGPS(0, 1)):
175  self.elem = elem
176  self.probe_handler_ids = {}
177  self.jitter = jitter
178  # keep a copy of the segmentlistdict incase the calling
179  # code modifies it
180  self.pad_added_handler_id = elem.connect("pad-added", self.pad_added, seglists.copy())
181 
182  def pad_added(self, element, pad, seglists):
183  name = pad.get_name()
184  if name in self.probe_handler_ids:
185  pad.remove_data_probe(self.probe_handler_ids.pop(name))
186  if name in seglists:
187  self.probe_handler_ids[name] = self.set_probe(pad, seglists[name], self.jitter)
188 
189  @staticmethod
190  def set_probe(pad, seglist, jitter = LIGOTimeGPS(0, 1)):
191  # use a copy of the segmentlist so the probe can modify it
192  return pad.add_data_probe(framecpp_channeldemux_check_segments.probe, (segments.segmentlist(seglist), jitter))
193 
194  @staticmethod
195  def probe(pad, obj, (seglist, jitter)):
196  if isinstance(obj, gst.Buffer):
197  if not obj.flag_is_set(gst.BUFFER_FLAG_GAP):
198  # remove the current buffer from the data
199  # we're expecting to see
200  seglist -= segments.segmentlist([segments.segment((LIGOTimeGPS(0, obj.timestamp), LIGOTimeGPS(0, obj.timestamp + obj.duration)))])
201  # ignore missing data intervals unless
202  # they're bigger than the jitter
203  iterutils.inplace_filter(lambda seg: abs(seg) > jitter, seglist)
204  # are we still expecting to see something that
205  # precedes the current buffer?
206  preceding = segments.segment((segments.NegInfinity, LIGOTimeGPS(0, obj.timestamp)))
207  if seglist.intersects_segment(preceding):
208  raise ValueError("%s: detected missing data: %s" % (pad.get_name(), seglist & segments.segmentlist([preceding])))
209  elif isinstance(obj, gst.Event) and obj.type == gst.EVENT_EOS:
210  if seglist:
211  raise ValueError("%s: at EOS detected missing data: %s" % (pad.get_name(), seglist))
212  return True
213 
214 
215 #
216 # framecpp file sink helpers
217 #
218 
219 
220 def framecpp_filesink_ldas_path_handler(elem, pspec, (outpath, dir_digits)):
221  """
222  Example:
223 
224  >>> filesinkelem.connect("notify::timestamp", framecpp_filesink_ldas_path_handler, (".", 5))
225  """
226  # get timestamp and truncate to integer seconds
227  timestamp = elem.get_property("timestamp") // gst.SECOND
228 
229  # extract leading digits
230  leading_digits = timestamp // 10**int(math.log10(timestamp) + 1 - dir_digits)
231 
232  # get other metadata
233  instrument = elem.get_property("instrument")
234  frame_type = elem.get_property("frame-type")
235 
236  # make target directory, and set path
237  path = os.path.join(outpath, "%s-%s-%d" % (instrument, frame_type, leading_digits))
238  if not os.path.exists(path):
239  os.makedirs(path)
240  elem.set_property("path", path)
241 
242 
244  """
245  Translate an element message posted by the multifilesink element
246  inside a framecpp_filesink bin into a lal.CacheEntry object
247  describing the file being written by the multifilesink element.
248  """
249  # extract the segment spanned by the file from the message directly
250  start = LIGOTimeGPS(0, message.structure["timestamp"])
251  end = start + LIGOTimeGPS(0, message.structure["duration"])
252 
253  # retrieve the framecpp_filesink bin (for instrument/observatory
254  # and frame file type)
255  parent = message.src.get_parent()
256 
257  # construct and return a CacheEntry object
258  return lal.CacheEntry(parent.get_property("instrument"), parent.get_property("frame-type"), segments.segment(start, end), "file://localhost%s" % os.path.abspath(message.structure["filename"]))
259 
260 
261 #
262 # =============================================================================
263 #
264 # Pipeline Parts
265 #
266 # =============================================================================
267 #
268 
269 
270 def mkchannelgram(pipeline, src, **properties):
271  return mkgeneric(pipeline, src, "lal_channelgram", **properties)
272 
273 
274 def mkspectrumplot(pipeline, src, **properties):
275  return mkgeneric(pipeline, src, "lal_spectrumplot", **properties)
276 
277 
278 def mkhistogram(pipeline, src):
279  return mkgeneric(pipeline, src, "lal_histogramplot")
280 
281 
282 ## Adds a <a href="@gstlalgtkdoc/GSTLALSegmentSrc.html">lal_segmentsrc</a> element to a pipeline with useful default properties
283 def mksegmentsrc(pipeline, segment_list, blocksize = 4096 * 1 * 1, invert_output = False):
284  # default blocksize is 4096 seconds of unsigned integers at
285  # 1 Hz, e.g. segments without nanoseconds
286  return mkgeneric(pipeline, None, "lal_segmentsrc", blocksize = blocksize, segment_list = segments.segmentlist(segments.segment(a.ns(), b.ns()) for a, b in segment_list), invert_output = invert_output)
287 
288 
289 ## Adds a <a href="@gstlalgtkdoc/GstLALCacheSrc.html">lal_cachesrc</a> element to a pipeline with useful default properties
290 def mklalcachesrc(pipeline, location, use_mmap = True, **properties):
291  return mkgeneric(pipeline, None, "lal_cachesrc", location = location, use_mmap = use_mmap, **properties)
292 
293 
294 def mklvshmsrc(pipeline, shm_name, **properties):
295  return mkgeneric(pipeline, None, "gds_lvshmsrc", shm_name = shm_name, **properties)
296 
297 
298 def mkframexmitsrc(pipeline, multicast_group, port, **properties):
299  return mkgeneric(pipeline, None, "gds_framexmitsrc", multicast_group = multicast_group, port = port, **properties)
300 
301 
302 def mkigwdparse(pipeline, src, **properties):
303  return mkgeneric(pipeline, src, "framecpp_igwdparse", **properties)
304 
305 
306 ## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-uridecodebin.html">uridecodebin</a> element to a pipeline with useful default properties
307 def mkuridecodebin(pipeline, uri, caps = "application/x-igwd-frame,framed=true", **properties):
308  return mkgeneric(pipeline, None, "uridecodebin", uri = uri, caps = None if caps is None else gst.Caps(caps), **properties)
309 
310 
311 def mkframecppchanneldemux(pipeline, src, **properties):
312  return mkgeneric(pipeline, src, "framecpp_channeldemux", **properties)
313 
314 
315 def mkframecppchannelmux(pipeline, channel_src_map, units = None, seglists = None, **properties):
316  elem = mkgeneric(pipeline, None, "framecpp_channelmux", **properties)
317  if channel_src_map is not None:
318  for channel, src in channel_src_map.items():
319  for srcpad in src.src_pads():
320  if srcpad.link(elem.get_pad(channel)) == gst.PAD_LINK_OK:
321  break
322  if units is not None:
324  if seglists is not None:
326  return elem
327 
328 
329 def mkframecppfilesink(pipeline, src, message_forward = True, **properties):
330  post_messages = properties.pop("post_messages", True)
331  elem = mkgeneric(pipeline, src, "framecpp_filesink", message_forward = message_forward, **properties)
332  # FIXME: there's supposed to be some sort of proxy mechanism for
333  # setting properties on child elements, but we can't seem to get
334  # anything to work
335  elem.get_by_name("multifilesink").set_property("post-messages", post_messages)
336  return elem
337 
338 
339 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-multifilesink.html">multifilesink</a> element to a pipeline with useful default properties
340 def mkmultifilesink(pipeline, src, next_file = 0, sync = False, async = False, **properties):
341  return mkgeneric(pipeline, src, "multifilesink", next_file = next_file, sync = sync, async = async, **properties)
342 
343 
344 def mkndssrc(pipeline, host, instrument, channel_name, channel_type, blocksize = 16384 * 8 * 1, port = 31200):
345  # default blocksize is 1 second of double precision floats at
346  # 16384 Hz, e.g., LIGO h(t)
347  return mkgeneric(pipeline, None, "ndssrc", blocksize = blocksize, port = port, host = host, channel_name = "%s:%s" % (instrument, channel_name), channel_type = channel_type)
348 
349 
350 ## Adds a <a href="@gstdoc/gstreamer-plugins-capsfilter.html">capsfilter</a> element to a pipeline with useful default properties
351 def mkcapsfilter(pipeline, src, caps):
352  return mkgeneric(pipeline, src, "capsfilter", caps = gst.Caps(caps))
353 
354 
355 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-capssetter.html">capssetter</a> element to a pipeline with useful default properties
356 def mkcapssetter(pipeline, src, caps, **properties):
357  return mkgeneric(pipeline, src, "capssetter", caps = gst.Caps(caps), **properties)
358 
359 
360 ## Adds a <a href="@gstlalgtkdoc/GSTLALStateVector.html">lal_statevector</a> element to a pipeline with useful default properties
361 def mkstatevector(pipeline, src, **properties):
362  return mkgeneric(pipeline, src, "lal_statevector", **properties)
363 
364 
365 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-taginject.html">taginject</a> element to a pipeline with useful default properties
366 def mktaginject(pipeline, src, tags):
367  return mkgeneric(pipeline, src, "taginject", tags = tags)
368 
369 
370 ## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audiotestsrc.html">audiotestsrc</a> element to a pipeline with useful default properties
371 def mkaudiotestsrc(pipeline, **properties):
372  return mkgeneric(pipeline, None, "audiotestsrc", **properties)
373 
374 
375 ## see documentation for mktaginject() mkcapsfilter() and mkaudiotestsrc()
376 def mkfakesrc(pipeline, instrument, channel_name, blocksize = None, volume = 1e-20, is_live = False, wave = 9, rate = 16384):
377  if blocksize is None:
378  # default blocksize is 1 second * rate samples/second * 8
379  # bytes/sample (assume double-precision floats)
380  blocksize = 1 * rate * 8
381  return mktaginject(pipeline, mkcapsfilter(pipeline, mkaudiotestsrc(pipeline, samplesperbuffer = blocksize / 8, wave = wave, volume = volume, is_live = is_live), "audio/x-raw-float, width=64, rate=%d" % rate), "instrument=%s,channel-name=%s,units=strain" % (instrument, channel_name))
382 
383 
384 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-audiofirfilter.html">audiofirfilter</a> element to a pipeline with useful default properties
385 def mkfirfilter(pipeline, src, kernel, latency, **properties):
386  properties.update((name, val) for name, val in (("kernel", kernel), ("latency", latency)) if val is not None)
387  return mkgeneric(pipeline, src, "audiofirfilter", **properties)
388 
389 
390 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-audioiirfilter.html">audioiirfilter</a> element to a pipeline with useful default properties
391 def mkiirfilter(pipeline, src, a, b):
392  # convention is z = \exp(-i 2 \pi f / f_{\rm sampling})
393  # H(z) = (\sum_{j=0}^{N} a_j z^{-j}) / (\sum_{j=0}^{N} (-1)^{j} b_j z^{-j})
394  return mkgeneric(pipeline, src, "audioiirfilter", a = a, b = b)
395 
396 
397 ## Adds a <a href="@gstlalgtkdoc/GSTLALShift.html">lal_shift</a> element to a pipeline with useful default properties
398 def mkshift(pipeline, src, **properties):
399  return mkgeneric(pipeline, src, "lal_shift", **properties)
400 
401 
402 def mkfakeLIGOsrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
403  properties = {"blocksize": blocksize}
404  properties.update((name, val) for name, val in (("instrument", instrument), ("channel_name", channel_name)) if val is not None)
405  return mkgeneric(pipeline, None, "lal_fakeligosrc", **properties)
406 
407 
408 def mkfakeadvLIGOsrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
409  properties = {"blocksize": blocksize}
410  properties.update((name, val) for name, val in (("instrument", instrument), ("channel_name", channel_name)) if val is not None)
411  return mkgeneric(pipeline, None, "lal_fakeadvligosrc", **properties)
412 
413 
414 def mkfakeadvvirgosrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
415  properties = {"blocksize": blocksize}
416  if instrument is not None:
417  properties["instrument"] = instrument
418  if channel_name is not None:
419  properties["channel_name"] = channel_name
420  return mkgeneric(pipeline, None, "lal_fakeadvvirgosrc", **properties)
421 
422 
423 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-progressreport.html">progress_report</a> element to a pipeline with useful default properties
424 def mkprogressreport(pipeline, src, name):
425  return mkgeneric(pipeline, src, "progressreport", do_query = False, name = name)
426 
427 
428 ## Adds a <a href="@gstlalgtkdoc/GSTLALSimulation.html">lal_simulation</a> element to a pipeline with useful default properties
429 def mkinjections(pipeline, src, filename):
430  return mkgeneric(pipeline, src, "lal_simulation", xml_location = filename)
431 
432 
433 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-audiochebband.html">audiochebband</a> element to a pipeline with useful default properties
434 def mkaudiochebband(pipeline, src, lower_frequency, upper_frequency, poles = 8):
435  return mkgeneric(pipeline, src, "audiochebband", lower_frequency = lower_frequency, upper_frequency = upper_frequency, poles = poles)
436 
437 
438 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-audiocheblimit.html">audiocheblimit</a> element to a pipeline with useful default properties
439 def mkaudiocheblimit(pipeline, src, cutoff, mode = 0, poles = 8):
440  return mkgeneric(pipeline, src, "audiocheblimit", cutoff = cutoff, mode = mode, poles = poles)
441 
442 
443 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-audioamplify.html">audioamplify</a> element to a pipeline with useful default properties
444 def mkaudioamplify(pipeline, src, amplification):
445  return mkgeneric(pipeline, src, "audioamplify", clipping_method = 3, amplification = amplification)
446 
447 
448 ## Adds a <a href="@gstlalgtkdoc/GSTLALAudioUnderSample.html">lal_audioundersample</a> element to a pipeline with useful default properties
449 def mkaudioundersample(pipeline, src):
450  return mkgeneric(pipeline, src, "lal_audioundersample")
451 
452 
453 ## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audioresample.html">audioresample</a> element to a pipeline with useful default properties
454 def mkresample(pipeline, src, **properties):
455  return mkgeneric(pipeline, src, "audioresample", **properties)
456 
457 
458 ## Adds a <a href="@gstlalgtkdoc/GSTLALInterpolator.html">lal_interpolator</a> element to a pipeline with useful default properties
459 def mkinterpolator(pipeline, src, **properties):
460  return mkgeneric(pipeline, src, "lal_interpolator", **properties)
461 
462 
463 ## Adds a <a href="@gstlalgtkdoc/GSTLALWhiten.html">lal_whiten</a> element to a pipeline with useful default properties
464 def mkwhiten(pipeline, src, psd_mode = 0, zero_pad = 0, fft_length = 8, average_samples = 64, median_samples = 7, **properties):
465  return mkgeneric(pipeline, src, "lal_whiten", psd_mode = psd_mode, zero_pad = zero_pad, fft_length = fft_length, average_samples = average_samples, median_samples = median_samples, **properties)
466 
467 
468 ## Adds a <a href="@gstdoc/gstreamer-plugins-tee.html">tee</a> element to a pipeline with useful default properties
469 def mktee(pipeline, src):
470  return mkgeneric(pipeline, src, "tee")
471 
472 
473 ## Adds a <a href="@gstdoc/GstLALAdder.html">lal_adder</a> element to a pipeline with useful default properties
474 def mkadder(pipeline, srcs, sync = True, **properties):
475  elem = mkgeneric(pipeline, None, "lal_adder", sync = sync, **properties)
476  if srcs is not None:
477  for src in srcs:
478  src.link(elem)
479  return elem
480 
481 
482 ## Adds a <a href="@gstdoc/gstreamer-plugins-queue.html">queue</a> element to a pipeline with useful default properties
483 def mkqueue(pipeline, src, **properties):
484  return mkgeneric(pipeline, src, "queue", **properties)
485 
486 
487 ## Adds a <a href="@gstlalgtkdoc/GSTLALWhiten.html">lal_whiten</a> element to a pipeline with useful default properties
488 def mkdrop(pipeline, src, drop_samples = 0):
489  return mkgeneric(pipeline, src, "lal_drop", drop_samples = drop_samples)
490 
491 
492 ## Adds a <a href="@gstlalgtkdoc/GSTLALNoFakeDisconts.html">lal_nofakedisconts</a> element to a pipeline with useful default properties
493 def mknofakedisconts(pipeline, src, silent = True):
494  return mkgeneric(pipeline, src, "lal_nofakedisconts", silent = silent)
495 
496 
497 ## Adds a <a href="@gstlalgtkdoc/GSTLALFIRBank.html">lal_firbank</a> element to a pipeline with useful default properties
498 def mkfirbank(pipeline, src, latency = None, fir_matrix = None, time_domain = None, block_stride = None):
499  properties = dict((name, value) for name, value in zip(("latency", "fir_matrix", "time_domain", "block_stride"), (latency, fir_matrix, time_domain, block_stride)) if value is not None)
500  return mkgeneric(pipeline, src, "lal_firbank", **properties)
501 
502 
503 def mkiirbank(pipeline, src, a1, b0, delay, name=None):
504  properties = dict((name, value) for name, value in (("name", name), ("delay_matrix", delay)) if value is not None)
505  if a1 is not None:
506  properties["a1_matrix"] = pipeio.repack_complex_array_to_real(a1)
507  if b0 is not None:
508  properties["b0_matrix"] = pipeio.repack_complex_array_to_real(b0)
509  elem = mkgeneric(pipeline, src, "lal_iirbank", **properties)
510  elem = mknofakedisconts(pipeline, elem) # FIXME: remove after basetransform behaviour fixed
511  return elem
512 
513 
514 def mkcudaiirbank(pipeline, src, a1, b0, delay, name=None):
515  properties = dict((name, value) for name, value in (("name", name), ("delay_matrix", delay)) if value is not None)
516  if a1 is not None:
517  properties["a1_matrix"] = pipeio.repack_complex_array_to_real(a1)
518  if b0 is not None:
519  properties["b0_matrix"] = pipeio.repack_complex_array_to_real(b0)
520  elem = mkgeneric(pipeline, src, "cuda_iirbank", **properties)
521  elem = mknofakedisconts(pipeline, elem) # FIXME: remove after basetransform behaviour fixed
522  return elem
523 
524 
525 def mkcudamultiratespiir(pipeline, src, bank_struct, bank_id=0, name=None):
526  properties = dict((name, value) for name, value in (("name", name), ("spiir_bank", bank_struct), ("bank_id", bank_id)) if value is not None)
527  elem = mkgeneric(pipeline, src, "cuda_multiratespiir", **properties)
528  return elem
529 
530 
531 def mktrim(pipeline, src, initial_offset = None, final_offset = None, inverse = None):
532  properties = dict((name, value) for name, value in zip(("initial-offset", "final-offset", "inverse"), (initial_offset,final_offset,inverse)) if value is not None)
533  return mkgeneric(pipeline, src, "lal_trim", **properties)
534 
535 
536 def mkmean(pipeline, src, **properties):
537  return mkgeneric(pipeline, src, "lal_mean", **properties)
538 
539 
540 def mkabs(pipeline, src, **properties):
541  return mkgeneric(pipeline, src, "abs", **properties)
542 
543 
544 def mkpow(pipeline, src, **properties):
545  return mkgeneric(pipeline, src, "pow", **properties)
546 
547 
548 ## Adds a <a href="@gstlalgtkdoc/GSTLALReblock.html">lal_reblock</a> element to a pipeline with useful default properties
549 def mkreblock(pipeline, src, **properties):
550  return mkgeneric(pipeline, src, "lal_reblock", **properties)
551 
552 
553 ## Adds a <a href="@gstlalgtkdoc/GSTLALSumSquares.html">lal_sumsquares</a> element to a pipeline with useful default properties
554 def mksumsquares(pipeline, src, weights = None):
555  if weights is not None:
556  return mkgeneric(pipeline, src, "lal_sumsquares", weights = weights)
557  else:
558  return mkgeneric(pipeline, src, "lal_sumsquares")
559 
560 
561 ## Adds a <a href="@gstlalgtkdoc/GSTLALGate.html">lal_gate</a> element to a pipeline with useful default properties
562 def mkgate(pipeline, src, threshold = None, control = None, **properties):
563  if threshold is not None:
564  elem = mkgeneric(pipeline, None, "lal_gate", threshold = threshold, **properties)
565  else:
566  elem = mkgeneric(pipeline, None, "lal_gate", **properties)
567  for peer, padname in ((src, "sink"), (control, "control")):
568  if isinstance(peer, gst.Pad):
569  peer.get_parent_element().link_pads(peer, elem, padname)
570  elif peer is not None:
571  peer.link_pads(None, elem, padname)
572  return elem
573 
574 
575 def mkbitvectorgen(pipeline, src, bit_vector, **properties):
576  return mkgeneric(pipeline, src, "lal_bitvectorgen", bit_vector = bit_vector, **properties)
577 
578 
579 ## Adds a <a href="@gstlalgtkdoc/GSTLALMatrixMixer.html">lal_matrixmixer</a> element to a pipeline with useful default properties
580 def mkmatrixmixer(pipeline, src, matrix = None):
581  if matrix is not None:
582  return mkgeneric(pipeline, src, "lal_matrixmixer", matrix = matrix)
583  else:
584  return mkgeneric(pipeline, src, "lal_matrixmixer")
585 
586 
587 ## Adds a <a href="@gstlalgtkdoc/GSTLALToggleComplex.html">lal_togglecomplex</a> element to a pipeline with useful default properties
588 def mktogglecomplex(pipeline, src):
589  return mkgeneric(pipeline, src, "lal_togglecomplex")
590 
591 
592 ## Adds a <a href="@gstlalgtkdoc/GSTLALAutoChiSq.html">lal_autochisq</a> element to a pipeline with useful default properties
593 def mkautochisq(pipeline, src, autocorrelation_matrix = None, mask_matrix = None, latency = 0, snr_thresh=0):
594  properties = {}
595  if autocorrelation_matrix is not None:
596  properties.update({
597  "autocorrelation_matrix": pipeio.repack_complex_array_to_real(autocorrelation_matrix),
598  "latency": latency,
599  "snr_thresh": snr_thresh
600  })
601  if mask_matrix is not None:
602  properties["autocorrelation_mask_matrix"] = mask_matrix
603  return mkgeneric(pipeline, src, "lal_autochisq", **properties)
604 
605 
606 ## Adds a <a href="@gstdoc/gstreamer-plugins-fakesink.html">fakesink</a> element to a pipeline with useful default properties
607 def mkfakesink(pipeline, src):
608  return mkgeneric(pipeline, src, "fakesink", sync = False, async = False)
609 
610 
611 ## Adds a <a href="@gstdoc/gstreamer-plugins-filesink.html">filesink</a> element to a pipeline with useful default properties
612 def mkfilesink(pipeline, src, filename):
613  return mkgeneric(pipeline, src, "filesink", sync = False, async = False, buffer_mode = 2, location = filename)
614 
615 
616 ## Adds a <a href="@gstlalgtkdoc/GstTSVEnc.html">lal_nxydump</a> element to a pipeline with useful default properties
617 def mknxydumpsink(pipeline, src, filename, segment = None):
618  if segment is not None:
619  elem = mkgeneric(pipeline, src, "lal_nxydump", start_time = segment[0].ns(), stop_time = segment[1].ns())
620  else:
621  elem = mkgeneric(pipeline, src, "lal_nxydump")
622  return mkfilesink(pipeline, elem, filename)
623 
624 
625 def mknxydumpsinktee(pipeline, src, *args, **properties):
626  t = mktee(pipeline, src)
627  mknxydumpsink(pipeline, mkqueue(pipeline, t), *args, **properties)
628  return t
629 
630 
631 def mkblcbctriggergen(pipeline, snr, chisq, template_bank_filename, snr_threshold, sigmasq):
632  # snr is complex and chisq is real so the correct source and sink
633  # pads will be selected automatically
634  elem = mkgeneric(pipeline, snr, "lal_blcbctriggergen", bank_filename = template_bank_filename, snr_thresh = snr_threshold, sigmasq = sigmasq)
635  chisq.link(elem)
636  return elem
637 
638 
639 def mktriggergen(pipeline, snr, chisq, template_bank_filename, snr_threshold, sigmasq):
640  # snr is complex and chisq is real so the correct source and sink
641  # pads will be selected automatically
642  elem = mkgeneric(pipeline, snr, "lal_triggergen", bank_filename = template_bank_filename, snr_thresh = snr_threshold, sigmasq = sigmasq)
643  chisq.link(elem)
644  return elem
645 
646 
647 def mktriggerxmlwritersink(pipeline, src, filename):
648  return mkgeneric(pipeline, src, "lal_triggerxmlwriter", sync = False, async = False, location = filename)
649 
650 
651 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-wavenc.html">wavenc</a> element to a pipeline with useful default properties
652 def mkwavenc(pipeline, src):
653  return mkgeneric(pipeline, src, "wavenc")
654 
655 
656 ## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-vorbisenc.html">vorbisenc</a> element to a pipeline with useful default properties
657 def mkvorbisenc(pipeline, src):
658  return mkgeneric(pipeline, src, "vorbisenc")
659 
660 
661 def mkcolorspace(pipeline, src):
662  return mkgeneric(pipeline, src, "ffmpegcolorspace")
663 
664 
665 def mktheoraenc(pipeline, src, **properties):
666  return mkgeneric(pipeline, src, "theoraenc", **properties)
667 
668 
669 def mkmpeg4enc(pipeline, src, **properties):
670  return mkgeneric(pipeline, src, "ffenc_mpeg4", **properties)
671 
672 
673 def mkoggmux(pipeline, src):
674  return mkgeneric(pipeline, src, "oggmux")
675 
676 
677 def mkavimux(pipeline, src):
678  return mkgeneric(pipeline, src, "avimux")
679 
680 
681 ## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audioconvert.html">audioconvert</a> element to a pipeline with useful default properties
682 def mkaudioconvert(pipeline, src, caps_string = None):
683  elem = mkgeneric(pipeline, src, "audioconvert")
684  if caps_string is not None:
685  elem = mkcapsfilter(pipeline, elem, caps_string)
686  return elem
687 
688 
689 ## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audiorate.html">audiorate</a> element to a pipeline with useful default properties
690 def mkaudiorate(pipeline, src, **properties):
691  return mkgeneric(pipeline, src, "audiorate", **properties)
692 
693 
694 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-flacenc.html">flacenc</a> element to a pipeline with useful default properties
695 def mkflacenc(pipeline, src, quality = 0, **properties):
696  return mkgeneric(pipeline, src, "flacenc", quality = quality, **properties)
697 
698 
699 def mkogmvideosink(pipeline, videosrc, filename, audiosrc = None, verbose = False):
700  src = mkcolorspace(pipeline, videosrc)
701  src = mkcapsfilter(pipeline, src, "video/x-raw-yuv, format=(fourcc)I420")
702  src = mktheoraenc(pipeline, src, border = 2, quality = 48, quick = False)
703  src = mkoggmux(pipeline, src)
704  if audiosrc is not None:
705  mkflacenc(pipeline, mkcapsfilter(pipeline, mkaudioconvert(pipeline, audiosrc), "audio/x-raw-int, width=32, depth=24")).link(src)
706  if verbose:
707  src = mkprogressreport(pipeline, src, filename)
708  mkfilesink(pipeline, src, filename)
709 
710 
711 def mkvideosink(pipeline, src):
712  return mkgeneric(pipeline, mkcolorspace(pipeline, src), "autovideosink")
713 
714 
715 ## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-autoaudiosink.html">autoaudiosink</a> element to a pipeline with useful default properties
716 def mkautoaudiosink(pipeline, src):
717  return mkgeneric(pipeline, mkqueue(pipeline, src), "autoaudiosink")
718 
719 
720 def mkplaybacksink(pipeline, src, amplification = 0.1):
721  elems = (
722  gst.element_factory_make("audioconvert"),
723  gst.element_factory_make("capsfilter"),
724  gst.element_factory_make("audioamplify"),
725  gst.element_factory_make("audioconvert"),
726  gst.element_factory_make("queue"),
727  gst.element_factory_make("autoaudiosink")
728  )
729  elems[1].set_property("caps", gst.Caps("audio/x-raw-float, width=64"))
730  elems[2].set_property("amplification", amplification)
731  elems[4].set_property("max-size-time", 1 * gst.SECOND)
732  pipeline.add(*elems)
733  gst.element_link_many(src, *elems)
734 
735 # FIXME no specific alias for this url since this library only has one element.
736 # DO NOT DOCUMENT OTHER CODES THIS WAY! Use @gstdoc @gstpluginsbasedoc etc.
737 ## Adds a <a href="http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-base-libs/html/gstreamer-app.html">appsink</a> element to a pipeline with useful default properties
738 def mkappsink(pipeline, src, max_buffers = 1, drop = False, **properties):
739  return mkgeneric(pipeline, src, "appsink", sync = False, async = False, emit_signals = True, max_buffers = max_buffers, drop = drop, **properties)
740 
741 
742 class AppSync(object):
743  def __init__(self, appsink_new_buffer, appsinks = []):
744  self.lock = threading.Lock()
745  # handler to invoke on availability of new time-ordered
746  # buffer
747  self.appsink_new_buffer = appsink_new_buffer
748  # element --> timestamp of current buffer or None if no
749  # buffer yet available
750  self.appsinks = {}
751  # set of sink elements that are currently at EOS
752  self.at_eos = set()
753  # do equivalent of .add_sink() for each pre-built appsink
754  # element provided at this time
755  for elem in appsinks:
756  if elem in self.appsinks:
757  raise ValueError("duplicate appsinks %s" % repr(elem))
758  elem.connect("new-buffer", self.appsink_handler, False)
759  elem.connect("eos", self.appsink_handler, True)
760  self.appsinks[elem] = None
761 
762  def add_sink(self, pipeline, src, drop = False, **properties):
763  # NOTE that max buffers must be 1 for this to work
764  assert "max_buffers" not in properties
765  elem = mkappsink(pipeline, src, max_buffers = 1, drop = drop, **properties)
766  elem.connect("new-buffer", self.appsink_handler, False)
767  elem.connect("eos", self.appsink_handler, True)
768  self.appsinks[elem] = None
769  return elem
770 
771  def appsink_handler(self, elem, eos):
772  with self.lock:
773  # update eos status, and retrieve buffer timestamp
774  if eos:
775  self.at_eos.add(elem)
776  else:
777  self.at_eos.discard(elem)
778  assert self.appsinks[elem] is None
779  self.appsinks[elem] = elem.get_last_buffer().timestamp
780 
781  # keep looping while we can process buffers
782  while True:
783  # retrieve the timestamps of all elements that
784  # aren't at eos and all elements at eos that still
785  # have buffers in them
786  timestamps = [(t, e) for e, t in self.appsinks.items() if e not in self.at_eos or t is not None]
787  # nothing to do if all elements are at eos and do
788  # not have buffers
789  if not timestamps:
790  break
791  # find the element with the oldest timestamp. None
792  # compares as less than everything, so we'll find
793  # any element (that isn't at eos) that doesn't yet
794  # have a buffer (elements at eos and that are
795  # without buffers aren't in the list)
796  timestamp, elem_with_oldest = min(timestamps)
797  # if there's an element without a buffer, do
798  # nothing --- we require all non-eos elements to
799  # have buffers before proceding
800  if timestamp is None:
801  break
802  # clear timestamp and pass element to
803  # handler func. function call is done last
804  # so that all of our book-keeping has been
805  # taken care of in case an exception gets
806  # raised
807  self.appsinks[elem_with_oldest] = None
808  self.appsink_new_buffer(elem_with_oldest)
809 
810 
811 def connect_appsink_dump_dot(pipeline, appsinks, basename, verbose = False):
812 
813  """
814  add a signal handler to write a pipeline graph upon receipt of the
815  first trigger buffer. the caps in the pipeline graph are not fully
816  negotiated until data comes out the end, so this version of the graph
817  shows the final formats on all links
818  """
819 
820  class AppsinkDumpDot(object):
821  # data shared by all instances
822  # number of times execute method has been invoked, and a mutex
823  n_lock = threading.Lock()
824  n = 0
825 
826  def __init__(self, pipeline, write_after, basename, verbose = False):
827  self.pipeline = pipeline
828  self.handler_id = None
829  self.write_after = write_after
830  self.filestem = "%s.%s" % (basename, "TRIGGERS")
831  self.verbose = verbose
832 
833  def execute(self, elem):
834  self.n_lock.acquire()
835  try:
836  type(self).n += 1
837  if self.n >= self.write_after:
838  write_dump_dot(self.pipeline, self.filestem, verbose = self.verbose)
839  finally:
840  self.n_lock.release()
841  elem.disconnect(self.handler_id)
842 
843  for sink in appsinks:
844  appsink_dump_dot = AppsinkDumpDot(pipeline, len(appsinks), basename = basename, verbose = verbose)
845  appsink_dump_dot.handler_id = sink.connect_after("new-buffer", appsink_dump_dot.execute)
846 
847 
848 def mkchecktimestamps(pipeline, src, name = None, silent = True, timestamp_fuzz = 1):
849  return mkgeneric(pipeline, src, "lal_checktimestamps", name = name, silent = silent, timestamp_fuzz = timestamp_fuzz)
850 
851 
852 ## Adds a <a href="@gstlalgtkdoc/GSTLALPeak.html">lal_peak</a> element to a pipeline with useful default properties
853 def mkpeak(pipeline, src, n):
854  return mkgeneric(pipeline, src, "lal_peak", n = n)
855 
856 
857 def mkitac(pipeline, src, n, bank, autocorrelation_matrix = None, mask_matrix = None, snr_thresh = 0, sigmasq = None):
858  properties = {
859  "n": n,
860  "bank_filename": bank,
861  "snr_thresh": snr_thresh
862  }
863  if autocorrelation_matrix is not None:
864  properties["autocorrelation_matrix"] = pipeio.repack_complex_array_to_real(autocorrelation_matrix)
865  if mask_matrix is not None:
866  properties["autocorrelation_mask"] = mask_matrix
867  if sigmasq is not None:
868  properties["sigmasq"] = sigmasq
869  return mkgeneric(pipeline, src, "lal_itac", **properties)
870 
871 
872 def mklhocoherentnull(pipeline, H1src, H2src, H1_impulse, H1_latency, H2_impulse, H2_latency, srate):
873  elem = mkgeneric(pipeline, None, "lal_lho_coherent_null", block_stride = srate, H1_impulse = H1_impulse, H2_impulse = H2_impulse, H1_latency = H1_latency, H2_latency = H2_latency)
874  for peer, padname in ((H1src, "H1sink"), (H2src, "H2sink")):
875  if isinstance(peer, gst.Pad):
876  peer.get_parent_element().link_pads(peer, elem, padname)
877  elif peer is not None:
878  peer.link_pads(None, elem, padname)
879  return elem
880 
881 def mkcomputegamma(pipeline, dctrl, exc, cos, sin, **properties):
882  elem = mkgeneric(pipeline, None, "lal_compute_gamma", **properties)
883  for peer, padname in ((dctrl, "dctrl_sink"), (exc, "exc_sink"), (cos, "cos"), (sin, "sin")):
884  if isinstance(peer, gst.Pad):
885  peer.get_parent_element().link_pads(peer, elem, padname)
886  elif peer is not None:
887  peer.link_pads(None, elem, padname)
888  return elem
889 
890 def mkbursttriggergen(pipeline, src, **properties):
891  return mkgeneric(pipeline, src, "lal_bursttriggergen", **properties)
892 
893 def mkodctodqv(pipeline, src, **properties):
894  return mkgeneric(pipeline, src, "lal_odc_to_dqv", **properties)
895 
896 def mktcpserversink(pipeline, src, **properties):
897  # units_soft_max = 1 GB
898  # FIXME: are these sensible defaults?
899  return mkgeneric(pipeline, src, "tcpserversink", sync = True, sync_method = "latest-keyframe", recover_policy = "keyframe", unit_type = "bytes", units_soft_max = 1024**3, **properties)
900 
901 
902 def audioresample_variance_gain(quality, num, den):
903  """Calculate the output gain of GStreamer's stock audioresample element.
904 
905  The audioresample element has a frequency response of unity "almost" all the
906  way up the Nyquist frequency. However, for an input of unit variance
907  Gaussian noise, the output will have a variance very slighly less than 1.
908  The return value is the variance that the filter will produce for a given
909  "quality" setting and sample rate.
910 
911  @param den The denomenator of the ratio of the input and output sample rates
912  @param num The numerator of the ratio of the input and output sample rates
913  @return The variance of the output signal for unit variance input
914 
915  The following example shows how to apply the correction factor using an
916  audioamplify element.
917 
918  >>> from gstlal.pipeutil import *
919  >>> from gstlal.pipeparts import audioresample_variance_gain
920  >>> from gstlal import pipeio
921  >>> import numpy
922  >>> nsamples = 2 ** 17
923  >>> num = 2
924  >>> den = 1
925  >>> def handoff_handler(element, buffer, pad, (quality, filt_len, num, den)):
926  ... out_latency = numpy.ceil(float(den) / num * filt_len)
927  ... buf = pipeio.array_from_audio_buffer(buffer).flatten()
928  ... std = numpy.std(buf[out_latency:-out_latency])
929  ... print "quality=%2d, filt_len=%3d, num=%d, den=%d, stdev=%.2f" % (
930  ... quality, filt_len, num, den, std)
931  ...
932  >>> for quality in range(11):
933  ... pipeline = gst.Pipeline()
934  ... correction = 1/numpy.sqrt(audioresample_variance_gain(quality, num, den))
935  ... elems = mkelems_in_bin(pipeline,
936  ... ('audiotestsrc', {'wave':'gaussian-noise','volume':1}),
937  ... ('capsfilter', {'caps':gst.Caps('audio/x-raw-float,width=64,rate=%d' % num)}),
938  ... ('audioresample', {'quality':quality}),
939  ... ('capsfilter', {'caps':gst.Caps('audio/x-raw-float,width=64,rate=%d' % den)}),
940  ... ('audioamplify', {'amplification':correction,'clipping-method':'none'}),
941  ... ('fakesink', {'signal-handoffs':True, 'num-buffers':1})
942  ... )
943  ... filt_len = elems[2].get_property('filter-length')
944  ... elems[0].set_property('samplesperbuffer', 2 * filt_len + nsamples)
945  ... if elems[-1].connect_after('handoff', handoff_handler, (quality, filt_len, num, den)) < 1:
946  ... raise RuntimeError
947  ... try:
948  ... if pipeline.set_state(gst.STATE_PLAYING) is not gst.STATE_CHANGE_ASYNC:
949  ... raise RuntimeError
950  ... if not pipeline.get_bus().poll(gst.MESSAGE_EOS, -1):
951  ... raise RuntimeError
952  ... finally:
953  ... if pipeline.set_state(gst.STATE_NULL) is not gst.STATE_CHANGE_SUCCESS:
954  ... raise RuntimeError
955  ...
956  quality= 0, filt_len= 8, num=2, den=1, stdev=1.00
957  quality= 1, filt_len= 16, num=2, den=1, stdev=1.00
958  quality= 2, filt_len= 32, num=2, den=1, stdev=1.00
959  quality= 3, filt_len= 48, num=2, den=1, stdev=1.00
960  quality= 4, filt_len= 64, num=2, den=1, stdev=1.00
961  quality= 5, filt_len= 80, num=2, den=1, stdev=1.00
962  quality= 6, filt_len= 96, num=2, den=1, stdev=1.00
963  quality= 7, filt_len=128, num=2, den=1, stdev=1.00
964  quality= 8, filt_len=160, num=2, den=1, stdev=1.00
965  quality= 9, filt_len=192, num=2, den=1, stdev=1.00
966  quality=10, filt_len=256, num=2, den=1, stdev=1.00
967  """
968 
969  # These constants were measured with 2**22 samples.
970 
971  if num > den: # downsampling
972  return den * (
973  0.7224862140943990596,
974  0.7975021342935247892,
975  0.8547537598970208483,
976  0.8744072146753004704,
977  0.9075294214410336568,
978  0.9101523813406768859,
979  0.9280549396020538744,
980  0.9391809530012216189,
981  0.9539276644089494939,
982  0.9623083437067311285,
983  0.9684700588501590213
984  )[quality] / num
985  elif num < den: # upsampling
986  return (
987  0.7539740617648067467,
988  0.8270076656536116122,
989  0.8835072979478705291,
990  0.8966758456219333651,
991  0.9253434087537378838,
992  0.9255866674042573239,
993  0.9346487800036394900,
994  0.9415331868209220190,
995  0.9524608799160205752,
996  0.9624372769883490220,
997  0.9704505626409354324
998  )[quality]
999  else: # no change in sample rate
1000  return 1.
1001 
1002 
1003 #
1004 # =============================================================================
1005 #
1006 # Debug utilities
1007 #
1008 # =============================================================================
1009 #
1010 
1011 
1012 def write_dump_dot(pipeline, filestem, verbose = False):
1013  """
1014  This function needs the environment variable GST_DEBUG_DUMP_DOT_DIR
1015  to be set. The filename will be
1016 
1017  os.path.join($GST_DEBUG_DUMP_DOT_DIR, filestem + ".dot")
1018 
1019  If verbose is True, a message will be written to stderr.
1020  """
1021  if "GST_DEBUG_DUMP_DOT_DIR" not in os.environ:
1022  raise ValueError("cannot write pipeline, environment variable GST_DEBUG_DUMP_DOT_DIR is not set")
1023  gst.DEBUG_BIN_TO_DOT_FILE(pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filestem)
1024  if verbose:
1025  print >>sys.stderr, "Wrote pipeline to %s" % os.path.join(os.environ["GST_DEBUG_DUMP_DOT_DIR"], "%s.dot" % filestem)