gstlal  0.8.1
 All Classes Namespaces Files Functions Variables Pages
lal_checktimestamps_test_01.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2013 Kipp Cannon
3 # Copyright (C) 2014 Chad Hanna
4 #
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2 of the License, or (at your
8 # option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 # Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 
19 #
20 # =============================================================================
21 #
22 # Preamble
23 #
24 # =============================================================================
25 #
26 
27 
28 import numpy
29 import sys
30 import test_common
31 
32 import pygtk
33 pygtk.require("2.0")
34 import gobject
35 import pygst
36 pygst.require("0.10")
37 import gst
38 gobject.threads_init()
39 
40 from gstlal import pipeparts
41 from gstlal import simplehandler
42 import signal
43 import time
44 
45 ## @file lal_checktimestamps_test_01.py
46 # A program to intentionally break a running stream in order to test lal_checktimestamps; see lal_checktimestamps_test_01 for more details
47 
48 #
49 # Setup a signal hander to introduce a shift with ctrl+C
50 #
51 
52 ## @package lal_checktimestamps_test_01
53 #
54 # ### USAGE
55 #
56 # - This test program dynamically adds a one nanosecond time shift every time the user hits ctrl+C. You need to do kill -9 to stop the program ;) Here is an example session
57 #
58 # ./lal_checktimestamps_test_01.py
59 # src (00:00:05): 5 seconds
60 # src (00:00:10): 10 seconds
61 # ^Cshifting by 1 ns
62 # lal_checktimestamps+lal_checktimestamps0: discontinuity: timestamp = 12.000682382 s, offset = 24576; would have been 12.000682381 s, offset = 24576
63 # src (00:00:15): 15 seconds
64 # ^Cshifting by 2 ns
65 # lal_checktimestamps+lal_checktimestamps0: discontinuity: timestamp = 17.000682383 s, offset = 34816; would have been 17.000682382 s, offset = 34816
66 # src (00:00:20): 20 seconds
67 # ^Cshifting by 3 ns
68 # lal_checktimestamps+lal_checktimestamps0: discontinuity: timestamp = 22.000682384 s, offset = 45056; would have been 22.000682383 s, offset = 45056
69 # ^Z
70 #
71 
72 
73 class SigHandler(object):
74  def __init__(self, pipeline):
75  self.pipeline = pipeline
76  self.shift = 1
77  signal.signal(signal.SIGINT, self)
78 
79  def __call__(self, signum, frame):
80  print "shifting by %d ns" % self.shift
81  self.pipeline.get_by_name("shift").set_property("shift", self.shift)
82  self.shift += 1
83 
84 # setup the pipeline and event loop
85 mainloop = gobject.MainLoop(context = gobject.MainContext())
86 pipeline = gst.Pipeline()
87 
88 # setup the test pipeline
89 head = test_common.test_src(pipeline, test_duration = 100.0, is_live = True)
90 head = pipeparts.mkshift(pipeline, head, shift = 0, name = "shift") # in nanoseconds
91 head = pipeparts.mkchecktimestamps(pipeline, head)
92 pipeparts.mkfakesink(pipeline, head)
93 
94 # setup the pipeline handlers and start it running
95 handler = simplehandler.Handler(mainloop, pipeline)
96 sighand = SigHandler(pipeline)
97 if pipeline.set_state(gst.STATE_PLAYING) == gst.STATE_CHANGE_FAILURE:
98  raise RuntimeError("pipeline failed to enter PLAYING state")
99 
100 # run the event loop
101 mainloop.run()