gstlal  0.8.1
 All Classes Namespaces Files Functions Variables Pages
test_common.py
1 # Copyright (C) 2009--2011,2013 Kipp Cannon
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 #
18 # =============================================================================
19 #
20 # Preamble
21 #
22 # =============================================================================
23 #
24 
25 
26 import numpy
27 import sys
28 
29 
30 import pygtk
31 pygtk.require("2.0")
32 import gobject
33 import pygst
34 pygst.require("0.10")
35 import gst
36 
37 
38 from gstlal import pipeparts
39 from gstlal import pipeio
40 from gstlal import simplehandler
41 
42 
43 gobject.threads_init()
44 
45 
46 #
47 # =============================================================================
48 #
49 # Utilities
50 #
51 # =============================================================================
52 #
53 
54 
55 def complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
56  head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
57  head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw-float, width=%d, rate=%d, channels=2" % (width, rate))
58  head = pipeparts.mktogglecomplex(pipeline, head)
59  if verbose:
60  head = pipeparts.mkprogressreport(pipeline, head, "src")
61  return head
62 
63 
64 def test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, is_live = False, verbose = True):
65  if wave == "ligo":
66  head = pipeparts.mkfakeLIGOsrc(pipeline, instrument = "H1", channel_name = "LSC-STRAIN")
67  else:
68  head = pipeparts.mkaudiotestsrc(pipeline, wave = wave, freq = freq, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length), is_live = is_live)
69  head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw-float, width=%d, rate=%d, channels=%d" % (width, rate, channels))
70  if verbose:
71  head = pipeparts.mkprogressreport(pipeline, head, "src")
72  return head
73 
74 
75 def gapped_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, channels = 1, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, is_live = False, verbose = True):
76  src = test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, channels = channels, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
77  if gap_frequency is None:
78  return src
79  control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw-float, width=32, rate=%d, channels=1" % rate)
80  if control_dump_filename is not None:
81  control = pipeparts.mktee(pipeline, control)
82  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
83  control = pipeparts.mkqueue(pipeline, control)
84  return pipeparts.mkgate(pipeline, src, control = control, threshold = gap_threshold)
85 
86 
87 def gapped_complex_test_src(pipeline, buffer_length = 1.0, rate = 2048, width = 64, test_duration = 10.0, wave = 5, freq = 0, gap_frequency = None, gap_threshold = None, control_dump_filename = None, tags = None, is_live = False, verbose = True):
88  src = complex_test_src(pipeline, buffer_length = buffer_length, rate = rate, width = width, test_duration = test_duration, wave = wave, freq = freq, is_live = is_live, verbose = verbose)
89  if tags is not None:
90  src = pipeparts.mktaginject(pipeline, src, tags)
91  if gap_frequency is None:
92  return src
93  control = pipeparts.mkcapsfilter(pipeline, pipeparts.mkaudiotestsrc(pipeline, wave = 0, freq = gap_frequency, blocksize = 8 * int(buffer_length * rate), volume = 1, num_buffers = int(test_duration / buffer_length)), "audio/x-raw-float, width=32, rate=%d, channels=1" % rate)
94  if control_dump_filename is not None:
95  control = pipeparts.mknxydumpsinktee(pipeline, pipeparts.mkqueue(pipeline, control), control_dump_filename)
96  control = pipeparts.mkqueue(pipeline, control)
97  return pipeparts.mktogglecomplex(pipeline, pipeparts.mkgate(pipeline, pipeparts.mktogglecomplex(pipeline, src), control = control, threshold = gap_threshold))
98 
99 
100 #
101 # =============================================================================
102 #
103 # Pipeline Builder
104 #
105 # =============================================================================
106 #
107 
108 
109 def build_and_run(pipelinefunc, name, segment = None, **pipelinefunc_kwargs):
110  print >>sys.stderr, "=== Running Test %s ===" % name
111  mainloop = gobject.MainLoop()
112  pipeline = pipelinefunc(gst.Pipeline(name), name, **pipelinefunc_kwargs)
113  handler = simplehandler.Handler(mainloop, pipeline)
114  if segment is not None:
115  if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_FAILURE:
116  raise RuntimeError("pipeline failed to enter PLAYING state")
117  pipeline.seek(1.0, gst.Format(gst.FORMAT_TIME), gst.SEEK_FLAG_FLUSH, gst.SEEK_TYPE_SET, segment[0].ns(), gst.SEEK_TYPE_SET, segment[1].ns())
118  if pipeline.set_state(gst.STATE_PLAYING) == gst.STATE_CHANGE_FAILURE:
119  raise RuntimeError("pipeline failed to enter PLAYING state")
120  mainloop.run()
121 
122 
123 #
124 # =============================================================================
125 #
126 # Push Arrays Through an Element
127 #
128 # =============================================================================
129 #
130 
131 
132 def transform_arrays(input_arrays, elemfunc, name, rate = 1, **elemfunc_kwargs):
133  input_arrays = list(input_arrays) # so we can modify it
134  output_arrays = []
135 
136  pipeline = gst.Pipeline(name)
137 
138  head = pipeparts.mkgeneric(pipeline, None, "appsrc", caps = pipeio.caps_from_array(input_arrays[0], rate = rate))
139  def need_data(elem, arg, (input_arrays, rate)):
140  if input_arrays:
141  elem.emit("push-buffer", pipeio.audio_buffer_from_array(input_arrays.pop(0), 0, 0, rate))
142  else:
143  elem.emit("end-of-stream")
144  head.connect("need-data", need_data, (input_arrays, rate))
145 
146  head = elemfunc(pipeline, head, **elemfunc_kwargs)
147 
148  head = pipeparts.mkappsink(pipeline, head)
149  def appsink_get_array(elem, output_arrays):
150  output_arrays.append(pipeio.array_from_audio_buffer(elem.get_last_buffer()))
151  head.connect("new-buffer", appsink_get_array, output_arrays)
152 
153  build_and_run((lambda *args, **kwargs: pipeline), name)
154 
155  return output_arrays