38 from gstlal
import pipeparts
39 from gstlal
import pipeio
40 from gstlal
import simplehandler
43 gobject.threads_init()
55 def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
56 head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
57 head = pipeparts.mkcapsfilter(pipeline, head,
"audio/x-raw-float, width=%d, rate=%d, channels=2" % (width, rate))
58 head = pipeparts.mktogglecomplex(pipeline, head)
60 head = pipeparts.mkprogressreport(pipeline, head,
"src")
64 def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
66 head = pipeparts.mkfakeLIGOsrc(pipeline, instrument =
"H1", channel_name =
"LSC-STRAIN")
68 head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
69 head = pipeparts.mkcapsfilter(pipeline, head,
"audio/x-raw-float, width=%d, rate=%d, channels=%d" % (width, rate, channels))
71 head = pipeparts.mkprogressreport(pipeline, head,
"src")
75 def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True):
76 src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
77 if gap_frequency
is None:
79 control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)),
"audio/x-raw-float, width=32, rate=%d, channels=1" % rate)
80 if control_dump_filename
is not None:
81 control = pipeparts.mktee(pipeline, control)
82 pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
83 control = pipeparts.mkqueue(pipeline, control)
84 return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold)
87 def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
88 src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
90 src = pipeparts.mktaginject(pipeline, src, tags)
91 if gap_frequency
is None:
93 control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)),
"audio/x-raw-float, width=32, rate=%d, channels=1" % rate)
94 if control_dump_filename
is not None:
95 control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
96 control = pipeparts.mkqueue(pipeline, control)
97 return pipeparts.mktogglecomplex(pipeline, pipeparts.mkgate(pipeline, pipeparts.mktogglecomplex(pipeline, src), control = control, threshold = gap_threshold))
109 def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs):
110 print >>sys.stderr,
"=== Running Test %s ===" % name
111 mainloop = gobject.MainLoop()
112 pipeline = pipelinefunc(gst.Pipeline(name), name, **pipelinefunc_kwargs)
113 handler = simplehandler.Handler(mainloop, pipeline)
114 if segment
is not None:
115 if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_FAILURE:
116 raise RuntimeError(
"pipeline failed to enter PLAYING state")
117 pipeline.seek(1.0, gst.Format(gst.FORMAT_TIME), gst.SEEK_FLAG_FLUSH, gst.SEEK_TYPE_SET, segment[0].ns(), gst.SEEK_TYPE_SET, segment[1].ns())
118 if pipeline.set_state(gst.STATE_PLAYING) == gst.STATE_CHANGE_FAILURE:
119 raise RuntimeError(
"pipeline failed to enter PLAYING state")
132 def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs):
133 input_arrays = list(input_arrays)
136 pipeline = gst.Pipeline(name)
138 head = pipeparts.mkgeneric(pipeline,
None,
"appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate))
139 def need_data(elem, arg, (input_arrays, rate)):
141 elem.emit(
"push-buffer", pipeio.audio_buffer_from_array(input_arrays.pop(0), 0, 0, rate))
143 elem.emit(
"end-of-stream")
144 head.connect(
"need-data", need_data, (input_arrays, rate))
146 head = elemfunc(pipeline, head, **elemfunc_kwargs)
148 head = pipeparts.mkappsink(pipeline, head)
149 def appsink_get_array(elem, output_arrays):
150 output_arrays.append(pipeio.array_from_audio_buffer(elem.get_last_buffer()))
151 head.connect(
"new-buffer", appsink_get_array, output_arrays)
153 build_and_run((
lambda *args, **kwargs: pipeline), name)