36 gobject.threads_init()
42 from glue
import iterutils
44 from glue
import segments
45 from gstlal
import pipeio
46 from pylal.xlal.datatypes.ligotimegps
import LIGOTimeGPS
49 __author__ =
"Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>, Drew Keppel <drew.keppel@ligo.org>"
83 def mkgeneric(pipeline, src, elem_type_name, **properties):
84 if "name" in properties:
85 elem = gst.element_factory_make(elem_type_name, properties.pop(
"name"))
87 elem = gst.element_factory_make(elem_type_name)
88 for name, value
in properties.items():
89 elem.set_property(name.replace(
"_",
"-"), value)
91 if isinstance(src, gst.Pad):
92 src.get_parent_element().link_pads(src, elem,
None)
105 A class that manages the task of watching for and connecting to new
106 source pads by name. The inputs are an element, the name of the
107 source pad to watch for on that element, and the sink pad (on a
108 different element) to which the source pad should be linked when it
111 The "pad-added" signal of the element will be used to watch for new
112 pads, and if the "no-more-pads" signal is emitted by the element
113 before the requested pad has appeared ValueException is raised.
115 def __init__(self, element, srcpadname, sinkpad):
116 no_more_pads_handler_id = element.connect(
"no-more-pads", self.
no_more_pads, srcpadname)
117 assert no_more_pads_handler_id > 0
118 pad_added_data = [srcpadname, sinkpad, no_more_pads_handler_id]
119 pad_added_handler_id = element.connect(
"pad-added", self.
pad_added, pad_added_data)
120 assert pad_added_handler_id > 0
121 pad_added_data.append(pad_added_handler_id)
124 def pad_added(element, pad, (srcpadname, sinkpad, no_more_pads_handler_id, pad_added_handler_id)):
125 if pad.get_name() == srcpadname:
126 element.handler_disconnect(no_more_pads_handler_id)
127 element.handler_disconnect(pad_added_handler_id)
131 def no_more_pads(element, srcpadname):
132 raise ValueError(
"<%s>: no pad named '%s'" % (element.get_name(), srcpadname))
143 Connect a handler for the pad-added signal of the
144 framecpp_channeldemux element elem, and when a pad is added
145 to the element if the pad's name appears as a key in the
146 units_dict dictionary that pad's units property will be set
147 to the string value associated with that key in the
152 >>> framecpp_channeldemux_set_units(elem, {"H1:LSC-STRAIN": "strain"})
154 NOTE: this is a work-around to address the problem that
155 most (all?) frame files do not have units set on their
156 channel data, whereas downstream consumers of the data
157 might require information about the units. The demuxer
158 provides the units as part of a tag event, and
159 framecpp_channeldemux_set_units() can be used to override
160 the values, thereby correcting absent or incorrect units
167 def pad_added(element, pad, units_dict):
168 name = pad.get_name()
169 if name
in units_dict:
170 pad.set_property(
"units", units_dict[name])
174 def __init__(self, elem, seglists, jitter = LIGOTimeGPS(0, 1)):
182 def pad_added(self, element, pad, seglists):
183 name = pad.get_name()
185 pad.remove_data_probe(self.probe_handler_ids.pop(name))
190 def set_probe(pad, seglist, jitter = LIGOTimeGPS(0, 1)):
192 return pad.add_data_probe(framecpp_channeldemux_check_segments.probe, (segments.segmentlist(seglist), jitter))
195 def probe(pad, obj, (seglist, jitter)):
196 if isinstance(obj, gst.Buffer):
197 if not obj.flag_is_set(gst.BUFFER_FLAG_GAP):
200 seglist -= segments.segmentlist([segments.segment((LIGOTimeGPS(0, obj.timestamp), LIGOTimeGPS(0, obj.timestamp + obj.duration)))])
203 iterutils.inplace_filter(
lambda seg: abs(seg) > jitter, seglist)
206 preceding = segments.segment((segments.NegInfinity, LIGOTimeGPS(0, obj.timestamp)))
207 if seglist.intersects_segment(preceding):
208 raise ValueError(
"%s: detected missing data: %s" % (pad.get_name(), seglist & segments.segmentlist([preceding])))
209 elif isinstance(obj, gst.Event)
and obj.type == gst.EVENT_EOS:
211 raise ValueError(
"%s: at EOS detected missing data: %s" % (pad.get_name(), seglist))
224 >>> filesinkelem.connect("notify::timestamp", framecpp_filesink_ldas_path_handler, (".", 5))
227 timestamp = elem.get_property(
"timestamp") // gst.SECOND
230 leading_digits = timestamp // 10**int(math.log10(timestamp) + 1 - dir_digits)
233 instrument = elem.get_property(
"instrument")
234 frame_type = elem.get_property(
"frame-type")
237 path = os.path.join(outpath,
"%s-%s-%d" % (instrument, frame_type, leading_digits))
238 if not os.path.exists(path):
240 elem.set_property(
"path", path)
245 Translate an element message posted by the multifilesink element
246 inside a framecpp_filesink bin into a lal.CacheEntry object
247 describing the file being written by the multifilesink element.
250 start = LIGOTimeGPS(0, message.structure[
"timestamp"])
251 end = start + LIGOTimeGPS(0, message.structure[
"duration"])
255 parent = message.src.get_parent()
258 return lal.CacheEntry(parent.get_property(
"instrument"), parent.get_property(
"frame-type"), segments.segment(start, end),
"file://localhost%s" % os.path.abspath(message.structure[
"filename"]))
270 def mkchannelgram(pipeline, src, **properties):
271 return mkgeneric(pipeline, src,
"lal_channelgram", **properties)
274 def mkspectrumplot(pipeline, src, **properties):
275 return mkgeneric(pipeline, src,
"lal_spectrumplot", **properties)
278 def mkhistogram(pipeline, src):
279 return mkgeneric(pipeline, src,
"lal_histogramplot")
283 def mksegmentsrc(pipeline, segment_list, blocksize = 4096 * 1 * 1, invert_output = False):
286 return mkgeneric(pipeline,
None,
"lal_segmentsrc", blocksize = blocksize, segment_list = segments.segmentlist(segments.segment(a.ns(), b.ns())
for a, b
in segment_list), invert_output = invert_output)
291 return mkgeneric(pipeline,
None,
"lal_cachesrc", location = location, use_mmap = use_mmap, **properties)
294 def mklvshmsrc(pipeline, shm_name, **properties):
295 return mkgeneric(pipeline,
None,
"gds_lvshmsrc", shm_name = shm_name, **properties)
298 def mkframexmitsrc(pipeline, multicast_group, port, **properties):
299 return mkgeneric(pipeline,
None,
"gds_framexmitsrc", multicast_group = multicast_group, port = port, **properties)
302 def mkigwdparse(pipeline, src, **properties):
303 return mkgeneric(pipeline, src,
"framecpp_igwdparse", **properties)
307 def mkuridecodebin(pipeline, uri, caps = "application/x-igwd-frame,framed=true", **properties):
308 return mkgeneric(pipeline,
None,
"uridecodebin", uri = uri, caps =
None if caps
is None else gst.Caps(caps), **properties)
311 def mkframecppchanneldemux(pipeline, src, **properties):
312 return mkgeneric(pipeline, src,
"framecpp_channeldemux", **properties)
315 def mkframecppchannelmux(pipeline, channel_src_map, units = None, seglists = None, **properties):
316 elem = mkgeneric(pipeline,
None,
"framecpp_channelmux", **properties)
317 if channel_src_map
is not None:
318 for channel, src
in channel_src_map.items():
319 for srcpad
in src.src_pads():
320 if srcpad.link(elem.get_pad(channel)) == gst.PAD_LINK_OK:
322 if units
is not None:
324 if seglists
is not None:
329 def mkframecppfilesink(pipeline, src, message_forward = True, **properties):
330 post_messages = properties.pop(
"post_messages",
True)
331 elem = mkgeneric(pipeline, src,
"framecpp_filesink", message_forward = message_forward, **properties)
335 elem.get_by_name(
"multifilesink").set_property(
"post-messages", post_messages)
340 def mkmultifilesink(pipeline, src, next_file = 0, sync = False, async = False, **properties):
341 return mkgeneric(pipeline, src,
"multifilesink", next_file = next_file, sync = sync, async = async, **properties)
344 def mkndssrc(pipeline, host, instrument, channel_name, channel_type, blocksize = 16384 * 8 * 1, port = 31200):
347 return mkgeneric(pipeline,
None,
"ndssrc", blocksize = blocksize, port = port, host = host, channel_name =
"%s:%s" % (instrument, channel_name), channel_type = channel_type)
352 return mkgeneric(pipeline, src,
"capsfilter", caps = gst.Caps(caps))
357 return mkgeneric(pipeline, src,
"capssetter", caps = gst.Caps(caps), **properties)
362 return mkgeneric(pipeline, src,
"lal_statevector", **properties)
367 return mkgeneric(pipeline, src,
"taginject", tags = tags)
372 return mkgeneric(pipeline,
None,
"audiotestsrc", **properties)
376 def mkfakesrc(pipeline, instrument, channel_name, blocksize = None, volume = 1e-20, is_live = False, wave = 9, rate = 16384):
377 if blocksize
is None:
380 blocksize = 1 * rate * 8
381 return mktaginject(pipeline,
mkcapsfilter(pipeline,
mkaudiotestsrc(pipeline, samplesperbuffer = blocksize / 8, wave = wave, volume = volume, is_live = is_live),
"audio/x-raw-float, width=64, rate=%d" % rate),
"instrument=%s,channel-name=%s,units=strain" % (instrument, channel_name))
386 properties.update((name, val)
for name, val
in ((
"kernel", kernel), (
"latency", latency))
if val
is not None)
387 return mkgeneric(pipeline, src,
"audiofirfilter", **properties)
394 return mkgeneric(pipeline, src,
"audioiirfilter", a = a, b = b)
399 return mkgeneric(pipeline, src,
"lal_shift", **properties)
402 def mkfakeLIGOsrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
403 properties = {
"blocksize": blocksize}
404 properties.update((name, val)
for name, val
in ((
"instrument", instrument), (
"channel_name", channel_name))
if val
is not None)
405 return mkgeneric(pipeline,
None,
"lal_fakeligosrc", **properties)
408 def mkfakeadvLIGOsrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
409 properties = {
"blocksize": blocksize}
410 properties.update((name, val)
for name, val
in ((
"instrument", instrument), (
"channel_name", channel_name))
if val
is not None)
411 return mkgeneric(pipeline,
None,
"lal_fakeadvligosrc", **properties)
414 def mkfakeadvvirgosrc(pipeline, location = None, instrument = None, channel_name = None, blocksize = 16384 * 8 * 1):
415 properties = {
"blocksize": blocksize}
416 if instrument
is not None:
417 properties[
"instrument"] = instrument
418 if channel_name
is not None:
419 properties[
"channel_name"] = channel_name
420 return mkgeneric(pipeline,
None,
"lal_fakeadvvirgosrc", **properties)
425 return mkgeneric(pipeline, src,
"progressreport", do_query =
False, name = name)
430 return mkgeneric(pipeline, src,
"lal_simulation", xml_location = filename)
435 return mkgeneric(pipeline, src,
"audiochebband", lower_frequency = lower_frequency, upper_frequency = upper_frequency, poles = poles)
440 return mkgeneric(pipeline, src,
"audiocheblimit", cutoff = cutoff, mode = mode, poles = poles)
445 return mkgeneric(pipeline, src,
"audioamplify", clipping_method = 3, amplification = amplification)
450 return mkgeneric(pipeline, src,
"lal_audioundersample")
455 return mkgeneric(pipeline, src,
"audioresample", **properties)
460 return mkgeneric(pipeline, src,
"lal_interpolator", **properties)
464 def mkwhiten(pipeline, src, psd_mode = 0, zero_pad = 0, fft_length = 8, average_samples = 64, median_samples = 7, **properties):
465 return mkgeneric(pipeline, src,
"lal_whiten", psd_mode = psd_mode, zero_pad = zero_pad, fft_length = fft_length, average_samples = average_samples, median_samples = median_samples, **properties)
470 return mkgeneric(pipeline, src,
"tee")
474 def mkadder(pipeline, srcs, sync = True, **properties):
475 elem = mkgeneric(pipeline,
None,
"lal_adder", sync = sync, **properties)
484 return mkgeneric(pipeline, src,
"queue", **properties)
488 def mkdrop(pipeline, src, drop_samples = 0):
489 return mkgeneric(pipeline, src,
"lal_drop", drop_samples = drop_samples)
494 return mkgeneric(pipeline, src,
"lal_nofakedisconts", silent = silent)
498 def mkfirbank(pipeline, src, latency = None, fir_matrix = None, time_domain = None, block_stride = None):
499 properties = dict((name, value)
for name, value
in zip((
"latency",
"fir_matrix",
"time_domain",
"block_stride"), (latency, fir_matrix, time_domain, block_stride))
if value
is not None)
500 return mkgeneric(pipeline, src,
"lal_firbank", **properties)
503 def mkiirbank(pipeline, src, a1, b0, delay, name=None):
504 properties = dict((name, value)
for name, value
in ((
"name", name), (
"delay_matrix", delay))
if value
is not None)
506 properties[
"a1_matrix"] = pipeio.repack_complex_array_to_real(a1)
508 properties[
"b0_matrix"] = pipeio.repack_complex_array_to_real(b0)
509 elem = mkgeneric(pipeline, src,
"lal_iirbank", **properties)
514 def mkcudaiirbank(pipeline, src, a1, b0, delay, name=None):
515 properties = dict((name, value)
for name, value
in ((
"name", name), (
"delay_matrix", delay))
if value
is not None)
517 properties[
"a1_matrix"] = pipeio.repack_complex_array_to_real(a1)
519 properties[
"b0_matrix"] = pipeio.repack_complex_array_to_real(b0)
520 elem = mkgeneric(pipeline, src,
"cuda_iirbank", **properties)
525 def mkcudamultiratespiir(pipeline, src, bank_struct, bank_id=0, name=None):
526 properties = dict((name, value)
for name, value
in ((
"name", name), (
"spiir_bank", bank_struct), (
"bank_id", bank_id))
if value
is not None)
527 elem = mkgeneric(pipeline, src,
"cuda_multiratespiir", **properties)
531 def mktrim(pipeline, src, initial_offset = None, final_offset = None, inverse = None):
532 properties = dict((name, value)
for name, value
in zip((
"initial-offset",
"final-offset",
"inverse"), (initial_offset,final_offset,inverse))
if value
is not None)
533 return mkgeneric(pipeline, src,
"lal_trim", **properties)
536 def mkmean(pipeline, src, **properties):
537 return mkgeneric(pipeline, src,
"lal_mean", **properties)
540 def mkabs(pipeline, src, **properties):
541 return mkgeneric(pipeline, src,
"abs", **properties)
544 def mkpow(pipeline, src, **properties):
545 return mkgeneric(pipeline, src,
"pow", **properties)
550 return mkgeneric(pipeline, src,
"lal_reblock", **properties)
555 if weights
is not None:
556 return mkgeneric(pipeline, src,
"lal_sumsquares", weights = weights)
558 return mkgeneric(pipeline, src,
"lal_sumsquares")
562 def mkgate(pipeline, src, threshold = None, control = None, **properties):
563 if threshold
is not None:
564 elem = mkgeneric(pipeline,
None,
"lal_gate", threshold = threshold, **properties)
566 elem = mkgeneric(pipeline,
None,
"lal_gate", **properties)
567 for peer, padname
in ((src,
"sink"), (control,
"control")):
568 if isinstance(peer, gst.Pad):
569 peer.get_parent_element().link_pads(peer, elem, padname)
570 elif peer
is not None:
571 peer.link_pads(
None, elem, padname)
575 def mkbitvectorgen(pipeline, src, bit_vector, **properties):
576 return mkgeneric(pipeline, src,
"lal_bitvectorgen", bit_vector = bit_vector, **properties)
581 if matrix
is not None:
582 return mkgeneric(pipeline, src,
"lal_matrixmixer", matrix = matrix)
584 return mkgeneric(pipeline, src,
"lal_matrixmixer")
589 return mkgeneric(pipeline, src,
"lal_togglecomplex")
593 def mkautochisq(pipeline, src, autocorrelation_matrix = None, mask_matrix = None, latency = 0, snr_thresh=0):
595 if autocorrelation_matrix
is not None:
597 "autocorrelation_matrix": pipeio.repack_complex_array_to_real(autocorrelation_matrix),
599 "snr_thresh": snr_thresh
601 if mask_matrix
is not None:
602 properties[
"autocorrelation_mask_matrix"] = mask_matrix
603 return mkgeneric(pipeline, src,
"lal_autochisq", **properties)
608 return mkgeneric(pipeline, src,
"fakesink", sync =
False, async =
False)
613 return mkgeneric(pipeline, src,
"filesink", sync =
False, async =
False, buffer_mode = 2, location = filename)
618 if segment
is not None:
619 elem = mkgeneric(pipeline, src,
"lal_nxydump", start_time = segment[0].ns(), stop_time = segment[1].ns())
621 elem = mkgeneric(pipeline, src,
"lal_nxydump")
625 def mknxydumpsinktee(pipeline, src, *args, **properties):
626 t =
mktee(pipeline, src)
631 def mkblcbctriggergen(pipeline, snr, chisq, template_bank_filename, snr_threshold, sigmasq):
634 elem = mkgeneric(pipeline, snr,
"lal_blcbctriggergen", bank_filename = template_bank_filename, snr_thresh = snr_threshold, sigmasq = sigmasq)
639 def mktriggergen(pipeline, snr, chisq, template_bank_filename, snr_threshold, sigmasq):
642 elem = mkgeneric(pipeline, snr,
"lal_triggergen", bank_filename = template_bank_filename, snr_thresh = snr_threshold, sigmasq = sigmasq)
647 def mktriggerxmlwritersink(pipeline, src, filename):
648 return mkgeneric(pipeline, src,
"lal_triggerxmlwriter", sync =
False, async =
False, location = filename)
653 return mkgeneric(pipeline, src,
"wavenc")
658 return mkgeneric(pipeline, src,
"vorbisenc")
661 def mkcolorspace(pipeline, src):
662 return mkgeneric(pipeline, src,
"ffmpegcolorspace")
665 def mktheoraenc(pipeline, src, **properties):
666 return mkgeneric(pipeline, src,
"theoraenc", **properties)
669 def mkmpeg4enc(pipeline, src, **properties):
670 return mkgeneric(pipeline, src,
"ffenc_mpeg4", **properties)
673 def mkoggmux(pipeline, src):
674 return mkgeneric(pipeline, src,
"oggmux")
677 def mkavimux(pipeline, src):
678 return mkgeneric(pipeline, src,
"avimux")
683 elem = mkgeneric(pipeline, src,
"audioconvert")
684 if caps_string
is not None:
691 return mkgeneric(pipeline, src,
"audiorate", **properties)
695 def mkflacenc(pipeline, src, quality = 0, **properties):
696 return mkgeneric(pipeline, src,
"flacenc", quality = quality, **properties)
699 def mkogmvideosink(pipeline, videosrc, filename, audiosrc = None, verbose = False):
700 src = mkcolorspace(pipeline, videosrc)
701 src =
mkcapsfilter(pipeline, src,
"video/x-raw-yuv, format=(fourcc)I420")
702 src = mktheoraenc(pipeline, src, border = 2, quality = 48, quick =
False)
703 src = mkoggmux(pipeline, src)
704 if audiosrc
is not None:
711 def mkvideosink(pipeline, src):
712 return mkgeneric(pipeline, mkcolorspace(pipeline, src),
"autovideosink")
717 return mkgeneric(pipeline,
mkqueue(pipeline, src),
"autoaudiosink")
720 def mkplaybacksink(pipeline, src, amplification = 0.1):
722 gst.element_factory_make(
"audioconvert"),
723 gst.element_factory_make(
"capsfilter"),
724 gst.element_factory_make(
"audioamplify"),
725 gst.element_factory_make(
"audioconvert"),
726 gst.element_factory_make(
"queue"),
727 gst.element_factory_make(
"autoaudiosink")
729 elems[1].set_property(
"caps", gst.Caps(
"audio/x-raw-float, width=64"))
730 elems[2].set_property(
"amplification", amplification)
731 elems[4].set_property(
"max-size-time", 1 * gst.SECOND)
733 gst.element_link_many(src, *elems)
738 def mkappsink(pipeline, src, max_buffers = 1, drop = False, **properties):
739 return mkgeneric(pipeline, src,
"appsink", sync =
False, async =
False, emit_signals =
True, max_buffers = max_buffers, drop = drop, **properties)
743 def __init__(self, appsink_new_buffer, appsinks = []):
744 self.
lock = threading.Lock()
755 for elem
in appsinks:
757 raise ValueError(
"duplicate appsinks %s" % repr(elem))
762 def add_sink(self, pipeline, src, drop = False, **properties):
764 assert "max_buffers" not in properties
765 elem =
mkappsink(pipeline, src, max_buffers = 1, drop = drop, **properties)
771 def appsink_handler(self, elem, eos):
775 self.at_eos.add(elem)
777 self.at_eos.discard(elem)
779 self.
appsinks[elem] = elem.get_last_buffer().timestamp
786 timestamps = [(t, e)
for e, t
in self.appsinks.items()
if e
not in self.
at_eos or t
is not None]
796 timestamp, elem_with_oldest = min(timestamps)
800 if timestamp
is None:
807 self.
appsinks[elem_with_oldest] =
None
814 add a signal handler to write a pipeline graph upon receipt of the
815 first trigger buffer. the caps in the pipeline graph are not fully
816 negotiated until data comes out the end, so this version of the graph
817 shows the final formats on all links
820 class AppsinkDumpDot(object):
823 n_lock = threading.Lock()
826 def __init__(self, pipeline, write_after, basename, verbose = False):
827 self.pipeline = pipeline
828 self.handler_id =
None
829 self.write_after = write_after
830 self.filestem =
"%s.%s" % (basename,
"TRIGGERS")
831 self.verbose = verbose
833 def execute(self, elem):
834 self.n_lock.acquire()
837 if self.n >= self.write_after:
838 write_dump_dot(self.pipeline, self.filestem, verbose = self.verbose)
840 self.n_lock.release()
841 elem.disconnect(self.handler_id)
843 for sink
in appsinks:
844 appsink_dump_dot = AppsinkDumpDot(pipeline, len(appsinks), basename = basename, verbose = verbose)
845 appsink_dump_dot.handler_id = sink.connect_after(
"new-buffer", appsink_dump_dot.execute)
848 def mkchecktimestamps(pipeline, src, name = None, silent = True, timestamp_fuzz = 1):
849 return mkgeneric(pipeline, src,
"lal_checktimestamps", name = name, silent = silent, timestamp_fuzz = timestamp_fuzz)
854 return mkgeneric(pipeline, src,
"lal_peak", n = n)
857 def mkitac(pipeline, src, n, bank, autocorrelation_matrix = None, mask_matrix = None, snr_thresh = 0, sigmasq = None):
860 "bank_filename": bank,
861 "snr_thresh": snr_thresh
863 if autocorrelation_matrix
is not None:
864 properties[
"autocorrelation_matrix"] = pipeio.repack_complex_array_to_real(autocorrelation_matrix)
865 if mask_matrix
is not None:
866 properties[
"autocorrelation_mask"] = mask_matrix
867 if sigmasq
is not None:
868 properties[
"sigmasq"] = sigmasq
869 return mkgeneric(pipeline, src,
"lal_itac", **properties)
872 def mklhocoherentnull(pipeline, H1src, H2src, H1_impulse, H1_latency, H2_impulse, H2_latency, srate):
873 elem = mkgeneric(pipeline,
None,
"lal_lho_coherent_null", block_stride = srate, H1_impulse = H1_impulse, H2_impulse = H2_impulse, H1_latency = H1_latency, H2_latency = H2_latency)
874 for peer, padname
in ((H1src,
"H1sink"), (H2src,
"H2sink")):
875 if isinstance(peer, gst.Pad):
876 peer.get_parent_element().link_pads(peer, elem, padname)
877 elif peer
is not None:
878 peer.link_pads(
None, elem, padname)
881 def mkcomputegamma(pipeline, dctrl, exc, cos, sin, **properties):
882 elem = mkgeneric(pipeline,
None,
"lal_compute_gamma", **properties)
883 for peer, padname
in ((dctrl,
"dctrl_sink"), (exc,
"exc_sink"), (cos,
"cos"), (sin,
"sin")):
884 if isinstance(peer, gst.Pad):
885 peer.get_parent_element().link_pads(peer, elem, padname)
886 elif peer
is not None:
887 peer.link_pads(
None, elem, padname)
890 def mkbursttriggergen(pipeline, src, **properties):
891 return mkgeneric(pipeline, src,
"lal_bursttriggergen", **properties)
893 def mkodctodqv(pipeline, src, **properties):
894 return mkgeneric(pipeline, src,
"lal_odc_to_dqv", **properties)
896 def mktcpserversink(pipeline, src, **properties):
899 return mkgeneric(pipeline, src,
"tcpserversink", sync =
True, sync_method =
"latest-keyframe", recover_policy =
"keyframe", unit_type =
"bytes", units_soft_max = 1024**3, **properties)
903 """Calculate the output gain of GStreamer's stock audioresample element.
905 The audioresample element has a frequency response of unity "almost" all the
906 way up the Nyquist frequency. However, for an input of unit variance
907 Gaussian noise, the output will have a variance very slighly less than 1.
908 The return value is the variance that the filter will produce for a given
909 "quality" setting and sample rate.
911 @param den The denomenator of the ratio of the input and output sample rates
912 @param num The numerator of the ratio of the input and output sample rates
913 @return The variance of the output signal for unit variance input
915 The following example shows how to apply the correction factor using an
916 audioamplify element.
918 >>> from gstlal.pipeutil import *
919 >>> from gstlal.pipeparts import audioresample_variance_gain
920 >>> from gstlal import pipeio
922 >>> nsamples = 2 ** 17
925 >>> def handoff_handler(element, buffer, pad, (quality, filt_len, num, den)):
926 ... out_latency = numpy.ceil(float(den) / num * filt_len)
927 ... buf = pipeio.array_from_audio_buffer(buffer).flatten()
928 ... std = numpy.std(buf[out_latency:-out_latency])
929 ... print "quality=%2d, filt_len=%3d, num=%d, den=%d, stdev=%.2f" % (
930 ... quality, filt_len, num, den, std)
932 >>> for quality in range(11):
933 ... pipeline = gst.Pipeline()
934 ... correction = 1/numpy.sqrt(audioresample_variance_gain(quality, num, den))
935 ... elems = mkelems_in_bin(pipeline,
936 ... ('audiotestsrc', {'wave':'gaussian-noise','volume':1}),
937 ... ('capsfilter', {'caps':gst.Caps('audio/x-raw-float,width=64,rate=%d' % num)}),
938 ... ('audioresample', {'quality':quality}),
939 ... ('capsfilter', {'caps':gst.Caps('audio/x-raw-float,width=64,rate=%d' % den)}),
940 ... ('audioamplify', {'amplification':correction,'clipping-method':'none'}),
941 ... ('fakesink', {'signal-handoffs':True, 'num-buffers':1})
943 ... filt_len = elems[2].get_property('filter-length')
944 ... elems[0].set_property('samplesperbuffer', 2 * filt_len + nsamples)
945 ... if elems[-1].connect_after('handoff', handoff_handler, (quality, filt_len, num, den)) < 1:
946 ... raise RuntimeError
948 ... if pipeline.set_state(gst.STATE_PLAYING) is not gst.STATE_CHANGE_ASYNC:
949 ... raise RuntimeError
950 ... if not pipeline.get_bus().poll(gst.MESSAGE_EOS, -1):
951 ... raise RuntimeError
953 ... if pipeline.set_state(gst.STATE_NULL) is not gst.STATE_CHANGE_SUCCESS:
954 ... raise RuntimeError
956 quality= 0, filt_len= 8, num=2, den=1, stdev=1.00
957 quality= 1, filt_len= 16, num=2, den=1, stdev=1.00
958 quality= 2, filt_len= 32, num=2, den=1, stdev=1.00
959 quality= 3, filt_len= 48, num=2, den=1, stdev=1.00
960 quality= 4, filt_len= 64, num=2, den=1, stdev=1.00
961 quality= 5, filt_len= 80, num=2, den=1, stdev=1.00
962 quality= 6, filt_len= 96, num=2, den=1, stdev=1.00
963 quality= 7, filt_len=128, num=2, den=1, stdev=1.00
964 quality= 8, filt_len=160, num=2, den=1, stdev=1.00
965 quality= 9, filt_len=192, num=2, den=1, stdev=1.00
966 quality=10, filt_len=256, num=2, den=1, stdev=1.00
973 0.7224862140943990596,
974 0.7975021342935247892,
975 0.8547537598970208483,
976 0.8744072146753004704,
977 0.9075294214410336568,
978 0.9101523813406768859,
979 0.9280549396020538744,
980 0.9391809530012216189,
981 0.9539276644089494939,
982 0.9623083437067311285,
983 0.9684700588501590213
987 0.7539740617648067467,
988 0.8270076656536116122,
989 0.8835072979478705291,
990 0.8966758456219333651,
991 0.9253434087537378838,
992 0.9255866674042573239,
993 0.9346487800036394900,
994 0.9415331868209220190,
995 0.9524608799160205752,
996 0.9624372769883490220,
997 0.9704505626409354324
1014 This function needs the environment variable GST_DEBUG_DUMP_DOT_DIR
1015 to be set. The filename will be
1017 os.path.join($GST_DEBUG_DUMP_DOT_DIR, filestem + ".dot")
1019 If verbose is True, a message will be written to stderr.
1021 if "GST_DEBUG_DUMP_DOT_DIR" not in os.environ:
1022 raise ValueError(
"cannot write pipeline, environment variable GST_DEBUG_DUMP_DOT_DIR is not set")
1023 gst.DEBUG_BIN_TO_DOT_FILE(pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filestem)
1025 print >>sys.stderr,
"Wrote pipeline to %s" % os.path.join(os.environ[
"GST_DEBUG_DUMP_DOT_DIR"],
"%s.dot" % filestem)