gstlal  0.8.1
 All Classes Namespaces Files Functions Variables Pages
whiten_test_01.py
1 #!/usr/bin/env python
2 # Copyright (C) 2009,2010 Kipp Cannon
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; either version 2 of the License, or (at your
7 # option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
12 # Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program; if not, write to the Free Software Foundation, Inc.,
16 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 
27 import numpy
28 import sys
29 from gstlal import pipeparts
30 from gstlal.pipeparts import gst
31 import cmp_nxydumps
32 import test_common
33 
34 
35 #
36 # =============================================================================
37 #
38 # Pipelines
39 #
40 # =============================================================================
41 #
42 
43 
44 #
45 # is the whiten element an identity transform when given a unit PSD? in
46 # and out timeseries should be identical modulo FFT precision and start-up
47 # and shut-down transients.
48 #
49 
50 
51 def whiten_test_01a(pipeline, name):
52  #
53  # signal handler to construct a new unit PSD (with LAL's
54  # normalization) whenever the frequency resolution or Nyquist
55  # frequency changes. LAL's normalization is such that the integral
56  # of the PSD yields the variance in the time domain, therefore
57  #
58  # PSD = 1 / (n \Delta f)
59  #
60 
61  def psd_resolution_changed(elem, pspec, ignored):
62  delta_f = elem.get_property("delta-f")
63  f_nyquist = elem.get_property("f-nyquist")
64  n = int(round(f_nyquist / delta_f) + 1)
65  elem.set_property("mean-psd", numpy.ones((n,), dtype = "double") / (n * delta_f))
66 
67  #
68  # try changing these. test should still work!
69  #
70 
71  rate = 2048 # Hz
72  zero_pad = 0.0 # seconds
73  fft_length = 4.0 # seconds
74  buffer_length = 1.0 # seconds
75  test_duration = 100.0 # seconds
76 
77  #
78  # build pipeline
79  #
80 
81  head = test_common.test_src(pipeline, buffer_length = buffer_length, rate = rate, test_duration = test_duration, wave = 9)
82  head = pipeparts.mkgeneric(pipeline, head, "audiocheblimit", mode = 1, cutoff = 0.25)
83  head = tee = pipeparts.mktee(pipeline, head)
84  head = pipeparts.mkwhiten(pipeline, head, psd_mode = 1, zero_pad = zero_pad, fft_length = fft_length)
85  head.connect_after("notify::f-nyquist", psd_resolution_changed, None)
86  head.connect_after("notify::delta-f", psd_resolution_changed, None)
87  head = pipeparts.mkchecktimestamps(pipeline, head)
88  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, head), "%s_out.dump" % name)
89  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, tee, max_size_time = int(fft_length * gst.SECOND)), "%s_in.dump" % name)
90 
91  #
92  # done
93  #
94 
95  return pipeline
96 
97 
98 #
99 # does the whitener turn coloured Gaussian noise into zero-mean,
100 # unit-variance stationary white Gaussian noise?
101 #
102 
103 
104 def whiten_test_01b(pipeline, name):
105  #
106  # try changing these. test should still work!
107  #
108 
109  rate = 2048 # Hz
110  zero_pad = 0.0 # seconds
111  fft_length = 4.0 # seconds
112  buffer_length = 1.0 # seconds
113  test_duration = 10000.0 # seconds
114 
115  #
116  # build pipeline
117  #
118 
119  head = test_common.test_src(pipeline, buffer_length = buffer_length, rate = rate, test_duration = test_duration, wave = 6)
120  head = pipeparts.mkwhiten(pipeline, head, psd_mode = 0, zero_pad = zero_pad, fft_length = fft_length)
121  head = pipeparts.mkchecktimestamps(pipeline, head)
122  pipeparts.mknxydumpsink(pipeline, pipeparts.mkqueue(pipeline, head), "%s_out.dump" % name)
123 
124  #
125  # done
126  #
127 
128  return pipeline
129 
130 
131 #
132 # =============================================================================
133 #
134 # Main
135 #
136 # =============================================================================
137 #
138 
139 
140 test_common.build_and_run(whiten_test_01a, "whiten_test_01a")
141 test_common.build_and_run(whiten_test_01b, "whiten_test_01b")
142 
143 cmp_nxydumps.compare("whiten_test_01a_in.dump", "whiten_test_01a_out.dump", transients = (2.0, 2.0), sample_fuzz = 1e-2)