gstlal  0.8.1
 All Classes Namespaces Files Functions Variables Pages
test_gstlal_adder.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2012 Karsten Wiesner
4 #
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 
19 #
20 # ==============================================================================
21 #
22 # Preamble
23 #
24 # ==============================================================================
25 #
26 """
27 unittest for the gstlal GstLALAdder class
28 """
29 __author__ = "Karsten Wiesner <karsten.wiesner@ligo.org>"
30 __copyright__ = "Copyright 2013, Karsten Wiesner"
31 
32 # The following snippet is taken from
33 # http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
34 import pygtk
35 pygtk.require("2.0")
36 import gobject
37 gobject.threads_init()
38 import pygst
39 pygst.require("0.10")
40 import gst
41 
42 from gstlal import simplehandler
43 from gstlal import pipeparts
44 
45 #from pylal.xlal.datatypes.snglinspiraltable import from_buffer as sngl_inspirals_from_buffer
46 
47 import unittest
48 import random
49 
50 import scipy
51 import numpy as np
52 
53 from pylab import figure, show
54 
55 
56 class TestGstLALAdder(unittest.TestCase):
57 
58  def __init__(self, testname,
59  DUT= "adder",
60  quiet= True,
61  volume= 0.25,
62  bits_per_sample= 32,
63  sample_rate= 1000,
64  samples_per_buffer= 1000,
65  num_of_buffers= 6,
66  timestamp_offs_B= 0, # timestamp offset at InputB
67  spb_offs_B= 0, # num of samples per buffer offset at InputB
68  num_bufs_offset_B= 0 ): # num_of_buffers offset at InputB
69 
70  super(TestGstLALAdder, self).__init__(testname)
71 
72  random.seed()
73 
74  # tweak this member vars for individual testcases ----------------
75  self.DUT= DUT
76  self.quiet= quiet
77  self.volume= volume
78  self.bits_per_sample= bits_per_sample
79  self.sample_rate= sample_rate
80  self.samples_per_buffer= samples_per_buffer
81  self.num_of_buffers= num_of_buffers
82  self.timestamp_offs_B= timestamp_offs_B
83  self.spb_offs_B= spb_offs_B
84  self.num_bufs_offset_B= num_bufs_offset_B
85 
86  # internal member vars --------------------------------------------
88  self.idx_buf_dut_out= 0 # index of buffer send on DUT output
89  self.accu_buf_dut_out=0 # accumutated output samples at dut out
90  # set the numpy output arrays according bps:
91  numpy_float_width_map= { 32:np.float32 , 64:np.float64 }
92  self.numpy_float_width= numpy_float_width_map[self.bits_per_sample]
93 
94 
95  # Test Fixture --------------------------------------------------------
96  def setUp(self):
97 
98  if(self.quiet==False):
99  print
100  print "setUp with bps={0} np-preci={1}".format(self.bits_per_sample,
101  self.numpy_float_width)
102 
103  self.pipeline = gst.Pipeline("test_gstlal_adder")
104  self.mainloop = gobject.MainLoop()
105  self.handler = simplehandler.Handler(self.mainloop, self.pipeline)
106 
107  self.dut_out_buf = np.array([])
108  self.src_a_buf = np.array([])
109  self.src_b_buf = np.array([])
110 
111  # InputA
112  src_a = pipeparts.mkaudiotestsrc(self.pipeline, wave = 0, freq = 880,
113  samplesperbuffer = self.samples_per_buffer,
114  volume = self.volume,
115  num_buffers = self.num_of_buffers,
116  name= "InputA")
117  capsfilt_a = pipeparts.mkcapsfilter(self.pipeline, src_a,
118  "audio/x-raw-float, width={0}, rate={1}".format(self.bits_per_sample,
119  self.sample_rate))
120  tee_a = pipeparts.mktee(self.pipeline, capsfilt_a)
121  if(self.quiet==False):
122  pipeparts.mknxydumpsink(self.pipeline, pipeparts.mkqueue(self.pipeline,
123  tee_a), "gstlal_adder_unittest_InputA.dump")
124 
125  # InputB
126  src_b = pipeparts.mkaudiotestsrc(self.pipeline, wave = 0, freq = 666,
127  samplesperbuffer = self.samples_per_buffer+self.spb_offs_B,
128  volume = self.volume,
129  num_buffers = self.num_of_buffers+self.num_bufs_offset_B,
130  name= "InputB",
131  timestamp_offset= self.timestamp_offs_B)
132  capsfilt_b = pipeparts.mkcapsfilter(self.pipeline, src_b,
133  "audio/x-raw-float, width={0}, rate={1}".format(self.bits_per_sample,
134  self.sample_rate))
135  tee_b = pipeparts.mktee(self.pipeline, capsfilt_b)
136  if(self.quiet==False):
137  pipeparts.mknxydumpsink(self.pipeline, pipeparts.mkqueue(self.pipeline,
138  tee_b), "gstlal_adder_unittest_InputB.dump")
139 
140  # DUT (Device Under Test)
141  adder = gst.element_factory_make(self.DUT)
142  adder.set_property("name", "DUT")
143 
144  if (self.DUT == "lal_adder"):
145  #adder.set_property("caps", ... takes a reference to the supplied GstCaps object.
146  adder.set_property("sync", True)
147 
148  self.pipeline.add(adder)
149  pipeparts.mkqueue(self.pipeline, tee_a).link(adder)
150  pipeparts.mkqueue(self.pipeline, tee_b).link(adder)
151 
152  # Output
153  tee_out = pipeparts.mktee(self.pipeline, adder)
154 
155  if (not self.quiet) and (self.bits_per_sample==32): # autoaudiosink can negotiate on 32 bps only
156  pipeparts.mknxydumpsink(self.pipeline, pipeparts.mkqueue(self.pipeline,
157  tee_out), "gstlal_adder_unittest_Output.dump")
158  sink = gst.element_factory_make("autoaudiosink")
159  self.pipeline.add(sink)
160  pipeparts.mkqueue(self.pipeline, tee_out).link(sink)
161 
162  # Fetching buffers from the pipeline
163  dut_out_appsink = pipeparts.mkappsink(self.pipeline,
164  pipeparts.mkqueue(self.pipeline, tee_out))
165  dut_out_appsink.connect("new-buffer", self.on_dut_out_appsink_new_buffer)
166 
167  # fetch source a
168  src_a_appsink = pipeparts.mkappsink(self.pipeline,
169  pipeparts.mkqueue(self.pipeline, tee_a))
170  src_a_appsink.connect("new-buffer", self.on_src_a_appsink_new_buffer)
171 
172  # fetch source b
173  src_b_appsink = pipeparts.mkappsink(self.pipeline,
174  pipeparts.mkqueue(self.pipeline, tee_b))
175  src_b_appsink.connect("new-buffer", self.on_src_b_appsink_new_buffer)
176 
177 
178 
179  # callback on dut_out_appsink has received a buffer
180  def on_dut_out_appsink_new_buffer(self, element):
181 
182  buffer = element.emit('pull-buffer')
183  samples= len(buffer)/(self.bits_per_sample/8)
184  self.accu_buf_dut_out += samples
185 
186  if(self.quiet==False):
187  print "DUT send buffer no.: {0} of size: {1} bytes ; {2} samples ; accu-samples {3}".format(
188  self.idx_buf_dut_out,
189  buffer.data.__sizeof__(),
190  samples,
191  self.accu_buf_dut_out)
192 
193  gst_app_buf= np.ndarray( shape= samples,
194  buffer= buffer.data,
195  dtype= self.numpy_float_width )
196 
197  self.dut_out_buf = np.concatenate([self.dut_out_buf, gst_app_buf])
198  self.idx_buf_dut_out += 1
199 
200  return True
201 
202  # callback on src_a_appsink has received a buffer
203  def on_src_a_appsink_new_buffer(self, element):
204  buffer = element.emit('pull-buffer')
205  gst_app_buf= np.ndarray( shape= (self.samples_per_buffer),
206  buffer=buffer.data,
207  dtype=self.numpy_float_width )
208  self.src_a_buf = np.concatenate([self.src_a_buf, gst_app_buf])
209  return True
210 
211  # callback on src_b_appsink has received a buffer
212  def on_src_b_appsink_new_buffer(self, element):
213  buffer = element.emit('pull-buffer')
214  gst_app_buf= np.ndarray( shape= (self.samples_per_buffer+self.spb_offs_B),
215  buffer=buffer.data,
216  dtype=self.numpy_float_width )
217  self.src_b_buf = np.concatenate([self.src_b_buf, gst_app_buf])
218  return True
219 
220 
221 
222  def tearDown(self):
223  pass
224 
225  # Unit tests -------------------------------
226 
228  """
229  Test 1
230  """
231  self.pipeline.set_state(gst.STATE_PLAYING)
232  pipeparts.write_dump_dot(self.pipeline, "test_1_plot_signals",
233  verbose = True)
234  self.mainloop.run()
235 
236  zoom_in = 50 # num of samples to display on each side of self.timestamp_offs_B_in_smpls
237 
238  # display left side offset region
239  fig_l=figure()
240 
241  src_a_plot_l= fig_l.add_subplot(1,1,1)
242  src_a_plot_l.set_xlim(self.timestamp_offs_B_in_smpls-zoom_in,
243  self.timestamp_offs_B_in_smpls+zoom_in)
244  src_a_plot_l.grid()
245  src_a_plot_l.plot(self.src_a_buf, 'g.', label='src a')
246 
247  src_b_plot_l= fig_l.add_subplot(1,1,1)
248  src_b_plot_l.set_xlim(self.timestamp_offs_B_in_smpls-zoom_in,
249  self.timestamp_offs_B_in_smpls+zoom_in)
250  x = scipy.arange(len(self.src_b_buf)) + self.timestamp_offs_B_in_smpls
251  src_b_plot_l.plot(x, self.src_b_buf, 'b.--', label='src b')
252 
253  dut_out_plot_l= fig_l.add_subplot(1,1,1)
254  dut_out_plot_l.set_xlim(self.timestamp_offs_B_in_smpls-zoom_in,
255  self.timestamp_offs_B_in_smpls+zoom_in)
256  dut_out_plot_l.plot(self.dut_out_buf, 'r', label='DUT out')
257 
258  dut_out_plot_l.set_xlabel('samples')
259  dut_out_plot_l.set_ylabel('volume')
260  dut_out_plot_l.legend(loc='upper right')
261  src_b_plot_l.set_title('Left side region adding buffer A and B(w/ offset timestamp)')
262 
263  # display right side offset region
264  fig_r=figure()
265 
266  src_a_plot_r= fig_r.add_subplot(1,1,1)
267  src_a_plot_r.set_xlim(len(self.dut_out_buf)-self.timestamp_offs_B_in_smpls-zoom_in,
268  len(self.dut_out_buf)-self.timestamp_offs_B_in_smpls+zoom_in)
269  src_a_plot_r.grid()
270  src_a_plot_r.plot(self.src_a_buf, 'g.', label='src a')
271 
272  src_b_plot_r= fig_r.add_subplot(1,1,1)
273  src_b_plot_r.set_xlim(len(self.dut_out_buf)-self.timestamp_offs_B_in_smpls-zoom_in,
274  len(self.dut_out_buf)-self.timestamp_offs_B_in_smpls+zoom_in)
275  x = scipy.arange(len(self.src_b_buf)) + self.timestamp_offs_B_in_smpls
276  src_b_plot_r.plot(x, self.src_b_buf, 'b.--', label='src b')
277 
278  dut_out_plot_r= fig_r.add_subplot(1,1,1)
279  dut_out_plot_r.set_xlim(len(self.dut_out_buf)-self.timestamp_offs_B_in_smpls-zoom_in,
280  len(self.dut_out_buf)-self.timestamp_offs_B_in_smpls+zoom_in)
281  dut_out_plot_r.plot(self.dut_out_buf, 'r', label='DUT out')
282 
283  dut_out_plot_r.set_xlabel('samples')
284  dut_out_plot_r.set_ylabel('volume')
285  dut_out_plot_r.legend(loc='upper left')
286  src_b_plot_r.set_title('Right side region adding buffer A and B(w/ offset timestamp)')
287 
288  # display all
289  fig_all=figure()
290  dut_out_all=fig_all.add_subplot(1,1,1)
291  dut_out_all.plot(self.src_a_buf, 'g.', label='src a')
292  x = scipy.arange(len(self.src_b_buf)) + self.timestamp_offs_B_in_smpls
293  dut_out_all.plot(x, self.src_b_buf, 'b.--', label='src b')
294  dut_out_all.plot(self.dut_out_buf, 'r', label='DUT out')
295 
296  dut_out_all.set_xlabel('samples')
297  dut_out_all.set_ylabel('volume')
298  dut_out_all.legend(loc='upper right')
299  dut_out_all.set_title('Adding buffer A and B(w/ offset timestamp)')
300 
301  show()
302 
303 
304  def test_2_32bps(self):
305  """
306  Test 1
307  """
308  self.pipeline.set_state(gst.STATE_PLAYING)
309 
310  if(self.quiet==False):
311  pipeparts.write_dump_dot(self.pipeline, "test_2_quiet_at_32bps",
312  verbose = True)
313  self.mainloop.run()
314 
315  offs= np.zeros((self.timestamp_offs_B_in_smpls), dtype=float)
316  a= np.append(self.src_a_buf, offs)
317  b= np.append(offs, self.src_b_buf)
318 
319  np_diff= a + b - self.dut_out_buf
320  absmax= np.amax(np.absolute(np_diff))
321 
322  if(self.quiet==False):
323  print
324  print "maximum of absolute differences from numpy reference add= {0}".format(absmax)
325  self.failIf(absmax >= 1.8e-8, "OOOPS!")
326 
327  #display all
328  #fig_all=figure()
329  #np_out_all=fig_all.add_subplot(1,1,1)
330  #np_out_all.plot(a+b, 'r')
331  #show()
332 
333  def test_3_64bps(self):
334  """
335  Test 1
336  """
337  self.pipeline.set_state(gst.STATE_PLAYING)
338 
339  if(self.quiet==False):
340  pipeparts.write_dump_dot(self.pipeline, "test_3_quiet_at_64bps",
341  verbose = True)
342  self.mainloop.run()
343 
344  offs= np.zeros((self.timestamp_offs_B_in_smpls), dtype=float)
345  a= np.append(self.src_a_buf, offs)
346  b= np.append(offs, self.src_b_buf)
347 
348  np_diff= a + b - self.dut_out_buf
349  absmax= np.amax(np.absolute(np_diff))
350 
351  if(self.quiet==False):
352  print
353  print "maximum of absolute differences from numpy reference add= {0}".format(absmax)
354  self.failIf(absmax >= 1.0e-20, "OOOPS!")
355 
356  #display all
357  #fig_all=figure()
358  #np_out_all=fig_all.add_subplot(1,1,1)
359  #np_out_all.plot(a+b, 'r')
360  #show()
361 
362 
363 # create, customize and run test suite: ------------------------------------------------------
364 
365 suite= unittest.TestSuite()
366 
367 #suite.addTest(TestGstLALAdder("test_1_plot_signals")) # test with the regular adder
368 
369 #suite.addTest(TestGstLALAdder("test_1_plot_signals",
370 # DUT= "lal_adder",
371 # quiet= False,
372 # volume= 0.25,
373 # bits_per_sample= 32,
374 # sample_rate= 1000,
375 # samples_per_buffer= 1000,
376 # num_of_buffers= 6,
377 # timestamp_offs_B= 0.2e9, ## 200 sp (1/sr * spb)
378 # spb_offs_B= 200,
379 # num_bufs_offset_B= -1))
380 
381 #suite.addTest(TestGstLALAdder("test_1_plot_signals",
382 # DUT= "lal_adder",
383 # quiet= False,
384 # volume= 0.25,
385 # bits_per_sample= 32,
386 # sample_rate= 44100,
387 # samples_per_buffer= 1000,
388 # num_of_buffers= 100,
389 # timestamp_offs_B= 0.0068e9, ## 300 sp (1/sr * spb)
390 # spb_offs_B= 250, ## 80 buffers at 1250 samples
391 # num_bufs_offset_B= -20)) ## ==> 100000 samples
392 
393 
394 suite.addTest(TestGstLALAdder("test_2_32bps")) # test with default settings:
395 suite.addTest(TestGstLALAdder("test_3_64bps")) # regular gst adder, no timeshift
396 
397 suite.addTest(TestGstLALAdder("test_2_32bps",
398  DUT= "lal_adder",
399  quiet= True,
400  volume= 0.25,
401  bits_per_sample= 32,
402  sample_rate= 1000,
403  samples_per_buffer= 1000,
404  num_of_buffers= 6,
405  timestamp_offs_B= 0.2e9,
406  spb_offs_B= 200,
407  num_bufs_offset_B= -1))
408 
409 suite.addTest(TestGstLALAdder("test_3_64bps",
410  DUT= "lal_adder",
411  quiet= True,
412  volume= 0.25,
413  bits_per_sample= 64,
414  sample_rate= 1000,
415  samples_per_buffer= 1000,
416  num_of_buffers= 6,
417  timestamp_offs_B= 0.2e9,
418  spb_offs_B= 200,
419  num_bufs_offset_B= -1))
420 
421 
422 # load all:
423 # suite = unittest.TestLoader().loadTestsFromTestCase(TestGstLALAdder)
424 
425 unittest.TextTestRunner(verbosity=5).run(suite)
426 
427