gstlal  0.8.1
 All Classes Namespaces Files Functions Variables Pages
datasource.py
Go to the documentation of this file.
1 # Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 ##
27 # @file
28 #
29 # A file that contains the datasource module code
30 #
31 # ### Review Status
32 #
33 # | Names | Hash | Date |
34 # | ------------------------------------------- | ------------------------------------------- | ---------- |
35 # | Florent, Sathya, Duncan Me., Jolien, Kipp, Chad | b3ef077fe87b597578000f140e4aa780f3a227aa | 2014-05-01 |
36 #
37 # #### Action items
38 #
39 # - State vector, DQ, etc must be in place by November and should be coordinated with JRPC/DAC/DetChar/CBC/etc
40 # - Consider a dynamic data time out that seneses when data is not going to arrive for a while
41 
42 ##
43 # @package python.datasource
44 #
45 # datasource module
46 #
47 
48 
49 import optparse
50 import sys
51 import time
52 
53 # The following snippet is taken from http://gstreamer.freedesktop.org/wiki/FAQ#Mypygstprogramismysteriouslycoredumping.2Chowtofixthis.3F
54 import pygtk
55 pygtk.require("2.0")
56 import gobject
57 gobject.threads_init()
58 import pygst
59 pygst.require('0.10')
60 import gst
61 
62 from gstlal import bottle
63 from gstlal import pipeparts
64 from glue.ligolw.utils import segments as ligolw_segments
65 from glue.ligolw import utils
66 from glue.ligolw import ligolw
67 from glue.ligolw import lsctables
68 from glue import segments
69 import lal
70 from pylal.xlal.datatypes.ligotimegps import LIGOTimeGPS
71 
72 
73 ## #### ContentHandler
74 # A stub to wrap ligolw.LIGOLWContentHandler for now
75 class ContentHandler(ligolw.LIGOLWContentHandler):
76  pass
77 lsctables.use_in(ContentHandler)
78 
79 
80 #
81 # Misc useful functions
82 #
83 
84 
86  """!
87  Given a list of channels, produce a dictionary keyed by ifo of channel names:
88 
89  The list here typically comes from an option parser with options that
90  specify the "append" action.
91 
92  Examples:
93 
94  >>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"])
95  {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}
96  """
97  return dict(instrument_channel.split("=") for instrument_channel in channel_list)
98 
99 
100 def pipeline_channel_list_from_channel_dict(channel_dict, ifos = None, opt = "channel-name"):
101  """!
102  Creates a string of channel names options from a dictionary keyed by ifos.
103 
104  FIXME: This function exists to work around pipeline.py's inability to
105  give the same option more than once by producing a string to pass as an argument
106  that encodes the other instances of the option.
107 
108  - override --channel-name with a different option by setting opt.
109  - restrict the ifo keys to a subset of the channel_dict by
110  setting ifos
111 
112  Examples:
113 
114  >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'})
115  'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
116 
117  >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"])
118  'H1=LSC-STRAIN '
119 
120  >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string")
121  'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
122  """
123  outstr = ""
124  if ifos is None:
125  ifos = channel_dict.keys()
126  for i, ifo in enumerate(ifos):
127  if i == 0:
128  outstr += "%s=%s " % (ifo, channel_dict[ifo])
129  else:
130  outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[ifo])
131 
132  return outstr
133 
134 
135 ## #### Default dictionary of state vector on/off bits by ifo
136 # Used as the default argument to state_vector_on_off_dict_from_bit_lists()
137 state_vector_on_off_dict = {
138  "H1" : [0x7, 0x160],
139  "H2" : [0x7, 0x160],
140  "L1" : [0x7, 0x160],
141  "V1" : [0x67, 0x100]
142 }
143 
144 
145 def state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict = state_vector_on_off_dict):
146  """!
147  Produce a dictionary (keyed by detector) of on / off bit tuples from a
148  list provided on the command line.
149 
150  Takes default values from module level datasource.state_vector_on_off_dict
151  if state_vector_on_off_dict is not given
152 
153  Inputs must be given as base 10 or 16 integers
154 
155  Examples:
156 
157  >>> on_bit_list = ["V1=7", "H1=7", "L1=7"]
158  >>> off_bit_list = ["V1=256", "H1=352", "L1=352"]
159  >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list)
160  {'H2': [7, 352], 'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
161 
162  >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})
163  {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
164 
165  >>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"]
166  >>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"]
167  >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})
168  {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}
169  """
170  for line in on_bit_list:
171  ifo = line.split("=")[0]
172  bits = "".join(line.split("=")[1:])
173  try:
174  val = int(bits)
175  except ValueError: # could be hex, that is all we support other than int
176  val = int(bits, 16)
177  try:
178  state_vector_on_off_dict[ifo][0] = val
179  except KeyError:
180  state_vector_on_off_dict[ifo] = [val, 0]
181 
182  for line in off_bit_list:
183  ifo = line.split("=")[0]
184  bits = "".join(line.split("=")[1:])
185  # shouldn't have to worry about key errors at this point
186  try:
187  state_vector_on_off_dict[ifo][1] = int(bits)
188  except ValueError: # must be hex
189  state_vector_on_off_dict[ifo][1] = int(bits, 16)
190 
191  return state_vector_on_off_dict
192 
193 
195  """!
196  Produce a tuple of useful command lines from a dictionary of on / off state
197  vector bits keyed by detector
198 
199  FIXME: This function exists to work around pipeline.py's inability to
200  give the same option more than once by producing a string to pass as an argument
201  that encodes the other instances of the option.
202 
203  Examples:
204 
205  >>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]}
206  >>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict)
207  ('H2=7 --state-vector-on-bits=V1=103 --state-vector-on-bits=H1=7 --state-vector-on-bits=L1=7 ', 'H2=352 --state-vector-off-bits=V1=256 --state-vector-off-bits=H1=352 --state-vector-off-bits=L1=352 ')
208  """
209 
210  onstr = ""
211  offstr = ""
212  for i, ifo in enumerate(bit_dict):
213  if i == 0:
214  onstr += "%s=%s " % (ifo, bit_dict[ifo][0])
215  offstr += "%s=%s " % (ifo, bit_dict[ifo][1])
216  else:
217  onstr += "--state-vector-on-bits=%s=%s " % (ifo, bit_dict[ifo][0])
218  offstr += "--state-vector-off-bits=%s=%s " % (ifo, bit_dict[ifo][1])
219 
220  return onstr, offstr
221 
222 
223 ## framexmit ports in use on the LDG
224 # Look-up table to map instrument name to framexmit multicast address and
225 # port
226 #
227 # used in mkbasicsrc()
228 #
229 # FIXME: this is only here temporarily while we test this approach to data
230 # aquisition. obviously we can't hard-code this stuff
231 #
232 framexmit_ports = {
233  "CIT": {
234  "H1": ("224.3.2.1", 7096),
235  "L1": ("224.3.2.2", 7097),
236  "V1": ("224.3.2.3", 7098),
237  }
238 }
239 
240 
242  """!
243  Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo:
244 
245  The list here typically comes from an option parser with options that
246  specify the "append" action.
247 
248  Examples:
249 
250  >>> framexmit_dict_from_framexmit_list(["H1=224.3.2.1:7096", "L1=224.3.2.2:7097", "V1=224.3.2.3:7098"])
251  {'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)}
252  """
253  out = []
254  for instrument_addr in framexmit_list:
255  ifo, addr_port = instrument_addr.split("=")
256  addr, port = addr_port.split(':')
257  out.append((ifo, (addr, int(port))))
258  return dict(out)
259 
260 
261 def framexmit_list_from_framexmit_dict(framexmit_dict, ifos = None, opt = "framexmit-addr"):
262  """!
263  Creates a string of framexmit address options from a dictionary keyed by ifos.
264 
265  Examples:
266 
267  >>> framexmit_list_from_framexmit_dict({'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)})
268  'V1=224.3.2.3:7098 --framexmit-addr=H1=224.3.2.1:7096 --framexmit-addr=L1=224.3.2.2:7097 '
269  """
270  outstr = ""
271  if ifos is None:
272  ifos = framexmit_dict.keys()
273  for i, ifo in enumerate(ifos):
274  if i == 0:
275  outstr += "%s=%s:%s " % (ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
276  else:
277  outstr += "--%s=%s=%s:%s " % (opt, ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
278 
279  return outstr
280 
281 
282 def seek_event_for_gps(gps_start_time, gps_end_time, flags = 0):
283  """!
284  Create a new seek event, i.e., gst.event_new_seek() for a given
285  gps_start_time and gps_end_time, with optional flags.
286 
287  @param gps_start_time start time as LIGOTimeGPS, double or float
288  @param gps_end_time start time as LIGOTimeGPS, double or float
289  """
290  def seek_args_for_gps(gps_time):
291  """!
292  Convenience routine to convert a GPS time to a seek type and a
293  GStreamer timestamp.
294  """
295 
296  if gps_time is None or gps_time == -1:
297  return (gst.SEEK_TYPE_NONE, -1) # -1 == gst.CLOCK_TIME_NONE
298  elif hasattr(gps_time, 'ns'):
299  return (gst.SEEK_TYPE_SET, gps_time.ns())
300  else:
301  return (gst.SEEK_TYPE_SET, long(float(gps_time) * gst.SECOND))
302 
303  start_type, start_time = seek_args_for_gps(gps_start_time)
304  stop_type, stop_time = seek_args_for_gps(gps_end_time)
305 
306  return gst.event_new_seek(1., gst.FORMAT_TIME, flags, start_type, start_time, stop_type, stop_time)
307 
308 
309 class GWDataSourceInfo(object):
310  """!
311  Hold the data associated with data source command lines.
312  """
313  ## See datasource.append_options()
314  def __init__(self, options):
315  """!
316  Initialize a GWDataSourceInfo class instance from command line options specified by append_options()
317  """
318 
319  ## A list of possible, valid data sources ("frames", "framexmit", "lvshm", "nds", "white", "silence", "AdvVirgo", "LIGO", "AdvLIGO")
320  self.data_sources = set(("frames", "framexmit", "lvshm", "nds", "white", "silence", "AdvVirgo", "LIGO", "AdvLIGO"))
321  self.live_sources = set(("framexmit", "lvshm"))
322  assert self.live_sources <= self.data_sources
323 
324  # Sanity check the options
325  if options.data_source not in self.data_sources:
326  raise ValueError("--data-source must be one of %s" % ", ".join(self.data_sources))
327  if options.data_source == "frames" and options.frame_cache is None:
328  raise ValueError("--frame-cache must be specified when using --data-source=frames")
329  if not options.channel_name:
330  raise ValueError("must specify at least one channel in the form --channel-name=IFO=CHANNEL-NAME")
331  if options.frame_segments_file is not None and options.data_source != "frames":
332  raise ValueError("can only give --frame-segments-file if --data-source=frames")
333  if options.frame_segments_name is not None and options.frame_segments_file is None:
334  raise ValueError("can only specify --frame-segments-name if --frame-segments-file is given")
335  if options.data_source == "nds" and (options.nds_host is None or options.nds_port is None):
336  raise ValueError("must specify --nds-host and --nds-port when using --data-source=nds")
337 
338  ## A dictionary of the requested channels, e.g., {"H1":"LDAS-STRAIN", "L1":"LDAS-STRAIN"}
339  self.channel_dict = channel_dict_from_channel_list(options.channel_name)
340 
341  ## A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
342  self.shm_part_dict = {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
343  if options.shared_memory_partition is not None:
344  self.shm_part_dict.update( channel_dict_from_channel_list(options.shared_memory_partition) )
345 
346  ## A dictionary of framexmit addresses
347  self.framexmit_addr = framexmit_ports["CIT"]
348  if options.framexmit_addr is not None:
349  self.framexmit_addr.update( framexmit_dict_from_framexmit_list(options.framexmit_addr) )
350  self.framexmit_iface = options.framexmit_iface
351 
352  ## Seek event. Default is None, i.e., no seek
353  self.seekevent = None
354 
355  ## Analysis segment. Default is None
356  self.seg = None
357 
358  if options.gps_start_time is not None:
359  if options.gps_end_time is None:
360  raise ValueError("must provide both --gps-start-time and --gps-end-time")
361  if options.data_source in self.live_sources:
362  raise ValueError("cannot set --gps-start-time or --gps-end-time with %s" % " or ".join("--data-source=%s" % src for src in sorted(self.live_sources)))
363  try:
364  start = LIGOTimeGPS(options.gps_start_time)
365  except ValueError:
366  raise ValueError("invalid --gps-start-time '%s'" % options.gps_start_time)
367  try:
368  end = LIGOTimeGPS(options.gps_end_time)
369  except ValueError:
370  raise ValueError("invalid --gps-end-time '%s'" % options.gps_end_time)
371  if start >= end:
372  raise ValueError("--gps-start-time must be < --gps-end-time: %s < %s" % (options.gps_start_time, options.gps_end_time))
373  ## Segment from gps start and stop time if given
374  self.seg = segments.segment(LIGOTimeGPS(options.gps_start_time), LIGOTimeGPS(options.gps_end_time))
375  ## Seek event from the gps start and stop time if given
376  self.seekevent = gst.event_new_seek(1., gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_KEY_UNIT, gst.SEEK_TYPE_SET, self.seg[0].ns(), gst.SEEK_TYPE_SET, self.seg[1].ns())
377  elif options.gps_end_time is not None:
378  raise ValueError("must provide both --gps-start-time and --gps-end-time")
379  elif options.data_source not in self.live_sources:
380  raise ValueError("--gps-start-time and --gps-end-time must be specified when %s" % " or ".join("--data-source=%s" % src for src in sorted(self.live_sources)))
381 
382  if options.frame_segments_file is not None:
383  ## Frame segments from a user defined file
384  self.frame_segments = ligolw_segments.segmenttable_get_by_name(utils.load_filename(options.frame_segments_file, contenthandler=ContentHandler), options.frame_segments_name).coalesce()
385  if self.seg is not None:
386  # Clip frame segments to seek segment if it
387  # exists (not required, just saves some
388  # memory and I/O overhead)
389  self.frame_segments = segments.segmentlistdict((instrument, seglist & segments.segmentlist([self.seg])) for instrument, seglist in self.frame_segments.items())
390  else:
391  ## if no frame segments provided, set them to an empty segment list dictionary
392  self.frame_segments = segments.segmentlistdict((instrument, None) for instrument in self.channel_dict)
393 
394  ## DQ (state vector) channel dictionary, e.g., { "H1": "LLD-DQ_VECTOR", "H2": "LLD-DQ_VECTOR","L1": "LLD-DQ_VECTOR", "V1": "LLD-DQ_VECTOR" }
395  self.dq_channel_dict = { "H1": "LLD-DQ_VECTOR", "H2": "LLD-DQ_VECTOR","L1": "LLD-DQ_VECTOR", "V1": "LLD-DQ_VECTOR" }
396 
397  ## DQ channel type, e.g., "LLD"
398  self.dq_channel_type = "LLD"
399 
400  if options.dq_channel_name is not None:
401  dq_channel_dict_from_options = channel_dict_from_channel_list( options.dq_channel_name )
402  instrument = dq_channel_dict_from_options.keys()[0]
403  self.dq_channel_dict.update( dq_channel_dict_from_options )
404  dq_channel = self.dq_channel_dict[instrument]
405  if "ODC_" in dq_channel.split("-")[1]:
406  self.dq_channel_type = "ODC"
407 
408  ## Dictionary of state vector on, off bits like {"H1" : [0x7, 0x160], "H2" : [0x7, 0x160], "L1" : [0x7, 0x160], "V1" : [0x67, 0x100]}
409  self.state_vector_on_off_bits = state_vector_on_off_dict_from_bit_lists(options.state_vector_on_bits, options.state_vector_off_bits)
410 
411  ## frame cache file
412  self.frame_cache = options.frame_cache
413  ## block size in bytes to read data from disk
414  self.block_size = options.block_size
415  ## Data source, one of python.datasource.GWDataSourceInfo.data_sources
416  self.data_source = options.data_source
417  ## Injection file name
418  self.injection_filename = options.injections
419 
420  if options.data_source == "nds":
421  ## Store the ndssrc specific options: host
422  self.nds_host = options.nds_host
423  ## Store the ndssrc specific options: port
424  self.nds_port = options.nds_port
425  ## Store the ndssrc specific options: channel_type
426  self.nds_channel_type = options.nds_channel_type
427 
428 
429 def append_options(parser):
430  """!
431  Append generic data source options to an OptionParser object in order
432  to have consistent an unified command lines and parsing throughout the project
433  for applications that read GW data.
434 
435 - --data-source [string]
436  Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO].
437 
438 - --block-size [int] (bytes)
439  Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double
440  precision data at 16384 Hz. This parameter is only used if --data-source is one of
441  white, silence, AdvVirgo, LIGO, AdvLIGO, nds.
442 
443 - --frame-cache [filename]
444  Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).
445  This is required iff --data-sourceframes)
446 
447 - --gps-start-time [int] (seconds)
448  Set the start time of the segment to analyze in GPS seconds.
449  Required unless --data-source in lvshm,framexmit
450 
451 - --gps-end-time [int] (seconds)
452  Set the end time of the segment to analyze in GPS seconds.
453  Required unless --data-source in lvshm,framexmit
454 
455 - --injections [filename]
456  Set the name of the LIGO light-weight XML file from which to load injections (optional).
457 
458 - --channel-name [string]
459  Set the name of the channels to process.
460  Can be given multiple times as --channel-name=IFO=CHANNEL-NAME
461 
462 - --nds-host [hostname]
463  Set the remote host or IP address that serves nds data.
464  This is required iff --data-source is nds
465 
466 - --nds-port [portnumber]
467  Set the port of the remote host that serves nds data, default = 31200.
468  This is required iff --data-source is nds
469 
470 - --nds-channel-type [string] type
471  FIXME please document
472 
473 - --framexmit-addr [string]
474  Set the address of the framexmit service. Can be given
475  multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port
476 
477 - --framexmit-iface [string]
478  Set the address of the framexmit interface.
479 
480 - --dq-channel-name [string]
481  Set the name of the data quality (or state vector) channel.
482  This channel will be used to control the flow of data via the on/off bits.
483  Can be given multiple times as --dq-channel-name=IFO=DQ-CHANNEL-NAME
484 
485 - --shared-memory-partition [string]
486  Set the name of the shared memory partition for a given instrument.
487  Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME
488 
489 - --frame-segments-file [filename]
490  Set the name of the LIGO light-weight XML file from which to load frame segments.
491  Optional iff --data-source is frames
492 
493 - --frame-segments-name [string]
494  Set the name of the segments to extract from the segment tables.
495  Required iff --frame-segments-file is given
496 
497 - --state-vector-on-bits [hex]
498  Set the state vector on bits to process (optional).
499  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times.
500  Only currently has meaning for online (lvshm, framexmit) data
501 
502 - --state-vector-off-bits [hex]
503  Set the state vector off bits to process (optional).
504  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times.
505  Only currently has meaning for online (lvshm, framexmit) data
506 
507  #### Typical usage case examples
508 
509  -# Reading data from frames
510 
511  --data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 --channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml --frame-segments-name=datasegments
512 
513  -# Reading data from a fake LIGO source
514 
515  --data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 --channel-name=H1=FAIKE-STRAIN
516 
517  -# Reading online data via framexmit
518 
519  --data-source=framexmit --channel-name=H1=FAIKE-STRAIN
520 
521  -# Many other combinations possible, please add some!
522  """
523  group = optparse.OptionGroup(parser, "Data source options", "Use these options to set up the appropriate data source")
524  group.add_option("--data-source", metavar = "source", help = "Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO]. Required.")
525  group.add_option("--block-size", type="int", metavar = "bytes", default = 16384 * 8 * 512, help = "Data block size to read in bytes. Default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz. This parameter is only used if --data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.")
526  group.add_option("--frame-cache", metavar = "filename", help = "Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional). This is required iff --data-source=frames")
527  group.add_option("--gps-start-time", metavar = "seconds", help = "Set the start time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
528  group.add_option("--gps-end-time", metavar = "seconds", help = "Set the end time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
529  group.add_option("--injections", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load injections (optional).")
530  group.add_option("--channel-name", metavar = "name", action = "append", help = "Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
531  group.add_option("--nds-host", metavar = "hostname", help = "Set the remote host or IP address that serves nds data. This is required iff --data-source=nds")
532  group.add_option("--nds-port", metavar = "portnumber", type=int, default=31200, help = "Set the port of the remote host that serves nds data. This is required iff --data-source=nds")
533  group.add_option("--nds-channel-type", metavar = "type", default = "online", help = "Set the port of the remote host that serves nds data. This is required only if --data-source=nds. default==online")
534  group.add_option("--framexmit-addr", metavar = "name", action = "append", help = "Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")
535  group.add_option("--framexmit-iface", metavar = "name", help = "Set the multicast interface address of the framexmit service.")
536  group.add_option("--dq-channel-name", metavar = "name", action = "append", help = "Set the name of the data quality (or state vector) channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
537  group.add_option("--shared-memory-partition", metavar = "name", action = "append", help = "Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME")
538  group.add_option("--frame-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff --data-source=frames")
539  group.add_option("--frame-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables. Required iff --frame-segments-file is given")
540  group.add_option("--state-vector-on-bits", metavar = "bits", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
541  group.add_option("--state-vector-off-bits", metavar = "bits", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
542  parser.add_option_group(group)
543 
544 
545 ## @cond DONTDOCUMENT
546 def do_seek(pipeline, seekevent):
547  # FIXME: remove. seek the pipeline instead
548  # DO NOT USE IN NEW CODE!!!!
549  for src in pipeline.iterate_sources():
550  if src.set_state(gst.STATE_READY) != gst.STATE_CHANGE_SUCCESS:
551  raise RuntimeError("Element %s did not want to enter ready state" % src.get_name())
552  if not src.send_event(seekevent):
553  raise RuntimeError("Element %s did not handle seek event" % src.get_name())
554 ## @endcond
555 
556 
557 ##
558 # _Gstreamer graph describing this function:_
559 #
560 # @dot
561 # digraph G {
562 # compound=true;
563 # node [shape=record fontsize=10 fontname="Verdana"];
564 # rankdir=LR;
565 # lal_gate;
566 # lal_segmentsrc [URL="\ref pipeparts.mksegmentsrc()"];
567 # lal_gate [URL="\ref pipeparts.mkgate()"];
568 # in [label="?"];
569 # out [label="?"];
570 # in -> lal_gate -> out;
571 # lal_segmentsrc -> lal_gate;
572 # }
573 # @enddot
574 #
575 #
576 def mksegmentsrcgate(pipeline, src, segment_list, seekevent = None, invert_output = False, **kwargs):
577  """!
578  Takes a segment list and produces a gate driven by it. Hook up your own input and output.
579 
580  @param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate's name.
581  """
582  segsrc = pipeparts.mksegmentsrc(pipeline, segment_list, invert_output = invert_output)
583  # FIXME: remove
584  if seekevent is not None:
585  do_seek(pipeline, seekevent)
586  return pipeparts.mkgate(pipeline, src, threshold = 1, control = segsrc, **kwargs)
587 
588 
589 ##
590 # _Gstreamer graph describing this function:_
591 #
592 # @dot
593 # digraph mkbasicsrc {
594 # compound=true;
595 # node [shape=record fontsize=10 fontname="Verdana"];
596 # subgraph clusterfakesrc {
597 # fake_0 [label="fakesrc: white, silence, AdvVirgo, LIGO, AdvLIGO", URL="\ref pipeparts.mkfakesrc()"];
598 # color=black;
599 # label="Possible path #1";
600 # }
601 # subgraph clusterframes {
602 # color=black;
603 # frames_0 [label="lalcachesrc: frames", URL="\ref pipeparts.mklalcachesrc()"];
604 # frames_1 [label ="framecppchanneldemux", URL="\ref pipeparts.mkframecppchanneldemux()"];
605 # frames_2 [label ="queue", URL="\ref pipeparts.mkqueue()"];
606 # frames_3 [label ="gate (if user provides segments)", style=filled, color=lightgrey, URL="\ref pipeparts.mkgate()"];
607 # frames_4 [label ="audiorate", URL="\ref pipeparts.mkaudiorate()"];
608 # frames_0 -> frames_1 -> frames_2 -> frames_3 ->frames_4;
609 # label="Possible path #2";
610 # }
611 # subgraph clusteronline {
612 # color=black;
613 # online_0 [label="lvshmsrc|framexmit", URL="\ref pipeparts.mklvshmsrc()"];
614 # online_1 [label ="framecppchanneldemux", URL="\ref pipeparts.mkframecppchanneldemux()"];
615 # online_2a [label ="strain queue", URL="\ref pipeparts.mkqueue()"];
616 # online_2b [label ="statevector queue", URL="\ref pipeparts.mkqueue()"];
617 # online_3 [label ="statevector", URL="\ref pipeparts.mkstatevector()"];
618 # online_4 [label ="gate", URL="\ref pipeparts.mkgate()"];
619 # online_5 [label ="audiorate", URL="\ref pipeparts.mkaudiorate()"];
620 # online_6 [label ="queue", URL="\ref pipeparts.mkqueue()"];
621 # online_0 -> online_1;
622 # online_1 -> online_2a;
623 # online_1 -> online_2b;
624 # online_2b -> online_3;
625 # online_2a -> online_4;
626 # online_3 -> online_4 -> online_5 -> online_6;
627 # label="Possible path #3";
628 # }
629 # subgraph clusternds {
630 # nds_0 [label="ndssrc", URL="\ref pipeparts.mkndssrc()"];
631 # color=black;
632 # label="Possible path #4";
633 # }
634 # audioconv [label="audioconvert", URL="\ref pipeparts.mkaudioconvert()"];
635 # progress [label="progressreport (if verbose)", style=filled, color=lightgrey, URL="\ref pipeparts.mkprogressreport()"];
636 # sim [label="lalsimulation (if injections requested)", style=filled, color=lightgrey, URL="\ref pipeparts.mkinjections()"];
637 # queue [label="queue (if injections requested)", style=filled, color=lightgrey, URL="\ref pipeparts.mkqueue()"];
638 #
639 # // The connections
640 # fake_0 -> audioconv [ltail=clusterfakesrc];
641 # frames_4 -> audioconv [ltail=clusterframes];
642 # online_6 -> audioconv [ltail=clusteronline];
643 # nds_0 -> audioconv [ltail=clusternds];
644 # audioconv -> progress -> sim -> queue -> "?"
645 # }
646 # @enddot
647 #
648 #
649 def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = False):
650  """!
651  All the conditionals and stupid pet tricks for reading real or
652  simulated h(t) data in one place.
653 
654  Consult the append_options() function and the GWDataSourceInfo class
655 
656  This src in general supports only one instrument although
657  GWDataSourceInfo contains dictionaries of multi-instrument things. By
658  specifying the instrument when calling this function you will get ony a single
659  instrument source. A code wishing to have multiple basicsrcs will need to call
660  this function for each instrument.
661  """
662 
663  if gw_data_source_info.data_source == "white":
664  src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, volume = 1.0)
665  elif gw_data_source_info.data_source == "silence":
666  src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, wave = 4)
667  elif gw_data_source_info.data_source == "LIGO":
668  src = pipeparts.mkfakeLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
669  elif gw_data_source_info.data_source == "AdvLIGO":
670  src = pipeparts.mkfakeadvLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
671  elif gw_data_source_info.data_source == "AdvVirgo":
672  src = pipeparts.mkfakeadvvirgosrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
673  elif gw_data_source_info.data_source == "frames":
674  if instrument == "V1":
675  #FIXME Hack because virgo often just uses "V" in the file names rather than "V1". We need to sieve on "V"
676  src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex = "V")
677  else:
678  src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex = instrument[0], cache_dsc_regex = instrument)
679  demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = True, channel_list = map("%s:%s".__mod__, gw_data_source_info.channel_dict.items()))
680  pipeparts.framecpp_channeldemux_set_units(demux, dict.fromkeys(demux.get_property("channel-list"), "strain"))
681  # allow frame reading and decoding to occur in a diffrent
682  # thread
683  src = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 8 * gst.SECOND)
684  pipeparts.src_deferred_link(demux, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), src.get_pad("sink"))
685  if gw_data_source_info.frame_segments[instrument] is not None:
686  # FIXME: make segmentsrc generate segment samples at the sample rate of h(t)?
687  # FIXME: make gate leaky when I'm certain that will work.
688  src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mksegmentsrc(pipeline, gw_data_source_info.frame_segments[instrument]), name = "%s_frame_segments_gate" % instrument)
689  pipeparts.framecpp_channeldemux_check_segments.set_probe(src.get_pad("src"), gw_data_source_info.frame_segments[instrument])
690  # FIXME: remove this when pipeline can handle disconts
691  src = pipeparts.mkaudiorate(pipeline, src, skip_to_first = True, silent = False)
692  elif gw_data_source_info.data_source in ("framexmit", "lvshm"):
693  # See https://wiki.ligo.org/DAC/ER2DataDistributionPlan#LIGO_Online_DQ_Channel_Specifica
694  state_vector_on_bits, state_vector_off_bits = gw_data_source_info.state_vector_on_off_bits[instrument]
695 
696  if gw_data_source_info.data_source == "lvshm":
697  # FIXME make wait_time adjustable through web interface or command line or both
698  src = pipeparts.mklvshmsrc(pipeline, shm_name = gw_data_source_info.shm_part_dict[instrument], wait_time = 120)
699  elif gw_data_source_info.data_source == "framexmit":
700  src = pipeparts.mkframexmitsrc(pipeline, multicast_iface = gw_data_source_info.framexmit_iface, multicast_group = gw_data_source_info.framexmit_addr[instrument][0], port = gw_data_source_info.framexmit_addr[instrument][1], wait_time = 120)
701  else:
702  # impossible code path
703  raise ValueError(gw_data_source_info.data_source)
704 
705  src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = True, skip_bad_files = True)
706  pipeparts.framecpp_channeldemux_set_units(src, {"%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]): "strain"})
707 
708  # strain
709  strain = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = gst.SECOND * 60 * 1) # 1 minutes of buffering
710  pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), strain.get_pad("sink"))
711  # state vector
712  # FIXME: don't hard-code channel name
713  statevector = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = gst.SECOND * 60 * 1) # 1 minutes of buffering
714  pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.dq_channel_dict[instrument]), statevector.get_pad("sink"))
715  if gw_data_source_info.dq_channel_type == "ODC" or gw_data_source_info.dq_channel_dict[instrument] == "Hrec_Flag_Quality":
716  # FIXME: This goes away when the ODC channel format is fixed.
717  statevector = pipeparts.mkgeneric(pipeline, statevector, "lal_fixodc")
718  statevector = pipeparts.mkstatevector(pipeline, statevector, required_on = state_vector_on_bits, required_off = state_vector_off_bits)
719  @bottle.route("/%s/state_vector_on_off_gap.txt" % instrument)
720  def state_vector_state(elem = statevector):
721  t = float(lal.UTCToGPS(time.gmtime()))
722  on = elem.get_property("on-samples")
723  off = elem.get_property("off-samples")
724  gap = elem.get_property("gap-samples")
725  return "%.9f %d %d %d" % (t, on, off, gap)
726 
727  # use state vector to gate strain
728  src = pipeparts.mkgate(pipeline, strain, threshold = 1, control = statevector, default_state = False, name = "%s_state_vector_gate" % instrument)
729 
730  # fill in holes, skip duplicate data
731  src = pipeparts.mkaudiorate(pipeline, src, skip_to_first = True, silent = False)
732  @bottle.route("/%s/strain_add_drop.txt" % instrument)
733  def strain_add(elem = src):
734  t = float(lal.UTCToGPS(time.gmtime()))
735  add = elem.get_property("add")
736  drop = elem.get_property("drop")
737  # FIXME don't hard code the sample rate
738  return "%.9f %d %d" % (t, add / 16384., drop / 16384.)
739 
740  # 10 minutes of buffering
741  src = pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = gst.SECOND * 60 * 10)
742  elif gw_data_source_info.data_source == "nds":
743  src = pipeparts.mkndssrc(pipeline, gw_data_source_info.nds_host, instrument, gw_data_source_info.channel_dict[instrument], gw_data_source_info.nds_channel_type, blocksize = gw_data_source_info.block_size, port = gw_data_source_info.nds_port)
744  else:
745  raise ValueError("invalid data_source: %s" % gw_data_source_info.data_source)
746 
747  #
748  # provide an audioconvert element to allow Virgo data (which is
749  # single-precision) to be adapted into the pipeline
750  #
751 
752  src = pipeparts.mkaudioconvert(pipeline, src)
753 
754  #
755  # progress report
756  #
757 
758  if verbose:
759  src = pipeparts.mkprogressreport(pipeline, src, "progress_src_%s" % instrument)
760 
761  #
762  # optional injections
763  #
764 
765  if gw_data_source_info.injection_filename is not None:
766  src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename)
767  # let the injection code run in a different thread than the
768  # whitener, etc.,
769  src = pipeparts.mkqueue(pipeline, src, max_size_bytes = 0, max_size_buffers = 0, max_size_time = gst.SECOND * 64)
770 
771  #
772  # seek the pipeline
773  # FIXME: remove
774  #
775 
776  if gw_data_source_info.data_source in ("white", "silence", "LIGO", "AdvLIGO", "AdvVirgo", "frames"):
777  do_seek(pipeline, gw_data_source_info.seekevent)
778 
779  #
780  # done
781  #
782 
783  return src
784 
785 
786 ##
787 # _Gstreamer graph describing the pipeline_
788 #
789 # @dot
790 # digraph G {
791 # compound=true;
792 # node [shape=record fontsize=10 fontname="Verdana"];
793 # rankdir=LR;
794 # tee [URL="\ref pipeparts.mktee()"];
795 # inputqueue [URL="\ref pipeparts.mkqueue()"];
796 # lal_gate [URL="\ref pipeparts.mkgate()"];
797 # in [label="?"];
798 # out [label="?"];
799 # in -> tee -> inputqueue -> lal_gate -> out;
800 # tee -> lal_gate;
801 # }
802 # @enddot
803 #
804 #
805 def mkhtgate(pipeline, src, control = None, threshold = 8.0, attack_length = 128, hold_length = 128, **kwargs):
806  """!
807  A convenience function to provide thresholds on input data. This can
808  be used to remove large spikes / glitches etc. Of course you can use it for
809  other stuff by plugging whatever you want as input and ouput
810 
811  NOTE: the queues constructed by this code assume the attack and
812  hold lengths combined are less than 1 second in duration.
813  """
814  # FIXME someday explore a good bandpass filter
815  # src = pipeparts.mkaudiochebband(pipeline, src, low_frequency, high_frequency)
816  if control is None:
817  control = src = pipeparts.mktee(pipeline, src)
818  src = pipeparts.mkqueue(pipeline, src, max_size_time = gst.SECOND, max_size_bytes = 0, max_size_buffers = 0)
819  return pipeparts.mkgate(pipeline, src, control = control, threshold = threshold, attack_length = -attack_length, hold_length = -hold_length, invert_control = True, **kwargs)
820 
821 # Unit tests
822 if __name__ == "__main__":
823  import doctest
824  doctest.testmod()