57 gobject.threads_init()
62 from gstlal
import bottle
63 from gstlal
import pipeparts
64 from glue.ligolw.utils
import segments
as ligolw_segments
65 from glue.ligolw
import utils
66 from glue.ligolw
import ligolw
67 from glue.ligolw
import lsctables
68 from glue
import segments
70 from pylal.xlal.datatypes.ligotimegps
import LIGOTimeGPS
77 lsctables.use_in(ContentHandler)
87 Given a list of channels, produce a dictionary keyed by ifo of channel names:
89 The list here typically comes from an option parser with options that
90 specify the "append" action.
94 >>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"])
95 {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}
97 return dict(instrument_channel.split(
"=")
for instrument_channel
in channel_list)
102 Creates a string of channel names options from a dictionary keyed by ifos.
104 FIXME: This function exists to work around pipeline.py's inability to
105 give the same option more than once by producing a string to pass as an argument
106 that encodes the other instances of the option.
108 - override --channel-name with a different option by setting opt.
109 - restrict the ifo keys to a subset of the channel_dict by
114 >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'})
115 'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
117 >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"])
120 >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string")
121 'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
125 ifos = channel_dict.keys()
126 for i, ifo
in enumerate(ifos):
128 outstr +=
"%s=%s " % (ifo, channel_dict[ifo])
130 outstr +=
"--%s=%s=%s " % (opt, ifo, channel_dict[ifo])
137 state_vector_on_off_dict = {
147 Produce a dictionary (keyed by detector) of on / off bit tuples from a
148 list provided on the command line.
150 Takes default values from module level datasource.state_vector_on_off_dict
151 if state_vector_on_off_dict is not given
153 Inputs must be given as base 10 or 16 integers
157 >>> on_bit_list = ["V1=7", "H1=7", "L1=7"]
158 >>> off_bit_list = ["V1=256", "H1=352", "L1=352"]
159 >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list)
160 {'H2': [7, 352], 'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
162 >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})
163 {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
165 >>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"]
166 >>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"]
167 >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})
168 {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}
170 for line
in on_bit_list:
171 ifo = line.split(
"=")[0]
172 bits =
"".join(line.split(
"=")[1:])
178 state_vector_on_off_dict[ifo][0] = val
180 state_vector_on_off_dict[ifo] = [val, 0]
182 for line
in off_bit_list:
183 ifo = line.split(
"=")[0]
184 bits =
"".join(line.split(
"=")[1:])
187 state_vector_on_off_dict[ifo][1] = int(bits)
189 state_vector_on_off_dict[ifo][1] = int(bits, 16)
191 return state_vector_on_off_dict
196 Produce a tuple of useful command lines from a dictionary of on / off state
197 vector bits keyed by detector
199 FIXME: This function exists to work around pipeline.py's inability to
200 give the same option more than once by producing a string to pass as an argument
201 that encodes the other instances of the option.
205 >>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]}
206 >>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict)
207 ('H2=7 --state-vector-on-bits=V1=103 --state-vector-on-bits=H1=7 --state-vector-on-bits=L1=7 ', 'H2=352 --state-vector-off-bits=V1=256 --state-vector-off-bits=H1=352 --state-vector-off-bits=L1=352 ')
212 for i, ifo
in enumerate(bit_dict):
214 onstr +=
"%s=%s " % (ifo, bit_dict[ifo][0])
215 offstr +=
"%s=%s " % (ifo, bit_dict[ifo][1])
217 onstr +=
"--state-vector-on-bits=%s=%s " % (ifo, bit_dict[ifo][0])
218 offstr +=
"--state-vector-off-bits=%s=%s " % (ifo, bit_dict[ifo][1])
234 "H1": (
"224.3.2.1", 7096),
235 "L1": (
"224.3.2.2", 7097),
236 "V1": (
"224.3.2.3", 7098),
243 Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo:
245 The list here typically comes from an option parser with options that
246 specify the "append" action.
250 >>> framexmit_dict_from_framexmit_list(["H1=224.3.2.1:7096", "L1=224.3.2.2:7097", "V1=224.3.2.3:7098"])
251 {'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)}
254 for instrument_addr
in framexmit_list:
255 ifo, addr_port = instrument_addr.split(
"=")
256 addr, port = addr_port.split(
':')
257 out.append((ifo, (addr, int(port))))
263 Creates a string of framexmit address options from a dictionary keyed by ifos.
267 >>> framexmit_list_from_framexmit_dict({'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)})
268 'V1=224.3.2.3:7098 --framexmit-addr=H1=224.3.2.1:7096 --framexmit-addr=L1=224.3.2.2:7097 '
272 ifos = framexmit_dict.keys()
273 for i, ifo
in enumerate(ifos):
275 outstr +=
"%s=%s:%s " % (ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
277 outstr +=
"--%s=%s=%s:%s " % (opt, ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
284 Create a new seek event, i.e., gst.event_new_seek() for a given
285 gps_start_time and gps_end_time, with optional flags.
287 @param gps_start_time start time as LIGOTimeGPS, double or float
288 @param gps_end_time start time as LIGOTimeGPS, double or float
290 def seek_args_for_gps(gps_time):
292 Convenience routine to convert a GPS time to a seek type and a
296 if gps_time
is None or gps_time == -1:
297 return (gst.SEEK_TYPE_NONE, -1)
298 elif hasattr(gps_time,
'ns'):
299 return (gst.SEEK_TYPE_SET, gps_time.ns())
301 return (gst.SEEK_TYPE_SET, long(float(gps_time) * gst.SECOND))
303 start_type, start_time = seek_args_for_gps(gps_start_time)
304 stop_type, stop_time = seek_args_for_gps(gps_end_time)
306 return gst.event_new_seek(1., gst.FORMAT_TIME, flags, start_type, start_time, stop_type, stop_time)
311 Hold the data associated with data source command lines.
316 Initialize a GWDataSourceInfo class instance from command line options specified by append_options()
320 self.
data_sources = set((
"frames",
"framexmit",
"lvshm",
"nds",
"white",
"silence",
"AdvVirgo",
"LIGO",
"AdvLIGO"))
326 raise ValueError(
"--data-source must be one of %s" %
", ".join(self.
data_sources))
327 if options.data_source ==
"frames" and options.frame_cache
is None:
328 raise ValueError(
"--frame-cache must be specified when using --data-source=frames")
329 if not options.channel_name:
330 raise ValueError(
"must specify at least one channel in the form --channel-name=IFO=CHANNEL-NAME")
331 if options.frame_segments_file
is not None and options.data_source !=
"frames":
332 raise ValueError(
"can only give --frame-segments-file if --data-source=frames")
333 if options.frame_segments_name
is not None and options.frame_segments_file
is None:
334 raise ValueError(
"can only specify --frame-segments-name if --frame-segments-file is given")
335 if options.data_source ==
"nds" and (options.nds_host
is None or options.nds_port
is None):
336 raise ValueError(
"must specify --nds-host and --nds-port when using --data-source=nds")
342 self.
shm_part_dict = {
"H1":
"LHO_Data",
"H2":
"LHO_Data",
"L1":
"LLO_Data",
"V1":
"VIRGO_Data"}
343 if options.shared_memory_partition
is not None:
348 if options.framexmit_addr
is not None:
358 if options.gps_start_time
is not None:
359 if options.gps_end_time
is None:
360 raise ValueError(
"must provide both --gps-start-time and --gps-end-time")
362 raise ValueError(
"cannot set --gps-start-time or --gps-end-time with %s" %
" or ".join(
"--data-source=%s" % src
for src
in sorted(self.
live_sources)))
364 start = LIGOTimeGPS(options.gps_start_time)
366 raise ValueError(
"invalid --gps-start-time '%s'" % options.gps_start_time)
368 end = LIGOTimeGPS(options.gps_end_time)
370 raise ValueError(
"invalid --gps-end-time '%s'" % options.gps_end_time)
372 raise ValueError(
"--gps-start-time must be < --gps-end-time: %s < %s" % (options.gps_start_time, options.gps_end_time))
374 self.
seg = segments.segment(LIGOTimeGPS(options.gps_start_time), LIGOTimeGPS(options.gps_end_time))
376 self.
seekevent = gst.event_new_seek(1., gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_KEY_UNIT, gst.SEEK_TYPE_SET, self.
seg[0].ns(), gst.SEEK_TYPE_SET, self.
seg[1].ns())
377 elif options.gps_end_time
is not None:
378 raise ValueError(
"must provide both --gps-start-time and --gps-end-time")
380 raise ValueError(
"--gps-start-time and --gps-end-time must be specified when %s" %
" or ".join(
"--data-source=%s" % src
for src
in sorted(self.
live_sources)))
382 if options.frame_segments_file
is not None:
384 self.
frame_segments = ligolw_segments.segmenttable_get_by_name(utils.load_filename(options.frame_segments_file, contenthandler=ContentHandler), options.frame_segments_name).coalesce()
385 if self.
seg is not None:
389 self.
frame_segments = segments.segmentlistdict((instrument, seglist & segments.segmentlist([self.
seg]))
for instrument, seglist
in self.frame_segments.items())
395 self.
dq_channel_dict = {
"H1":
"LLD-DQ_VECTOR",
"H2":
"LLD-DQ_VECTOR",
"L1":
"LLD-DQ_VECTOR",
"V1":
"LLD-DQ_VECTOR" }
400 if options.dq_channel_name
is not None:
402 instrument = dq_channel_dict_from_options.keys()[0]
403 self.dq_channel_dict.update( dq_channel_dict_from_options )
405 if "ODC_" in dq_channel.split(
"-")[1]:
420 if options.data_source ==
"nds":
431 Append generic data source options to an OptionParser object in order
432 to have consistent an unified command lines and parsing throughout the project
433 for applications that read GW data.
435 - --data-source [string]
436 Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO].
438 - --block-size [int] (bytes)
439 Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double
440 precision data at 16384 Hz. This parameter is only used if --data-source is one of
441 white, silence, AdvVirgo, LIGO, AdvLIGO, nds.
443 - --frame-cache [filename]
444 Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).
445 This is required iff --data-sourceframes)
447 - --gps-start-time [int] (seconds)
448 Set the start time of the segment to analyze in GPS seconds.
449 Required unless --data-source in lvshm,framexmit
451 - --gps-end-time [int] (seconds)
452 Set the end time of the segment to analyze in GPS seconds.
453 Required unless --data-source in lvshm,framexmit
455 - --injections [filename]
456 Set the name of the LIGO light-weight XML file from which to load injections (optional).
458 - --channel-name [string]
459 Set the name of the channels to process.
460 Can be given multiple times as --channel-name=IFO=CHANNEL-NAME
462 - --nds-host [hostname]
463 Set the remote host or IP address that serves nds data.
464 This is required iff --data-source is nds
466 - --nds-port [portnumber]
467 Set the port of the remote host that serves nds data, default = 31200.
468 This is required iff --data-source is nds
470 - --nds-channel-type [string] type
471 FIXME please document
473 - --framexmit-addr [string]
474 Set the address of the framexmit service. Can be given
475 multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port
477 - --framexmit-iface [string]
478 Set the address of the framexmit interface.
480 - --dq-channel-name [string]
481 Set the name of the data quality (or state vector) channel.
482 This channel will be used to control the flow of data via the on/off bits.
483 Can be given multiple times as --dq-channel-name=IFO=DQ-CHANNEL-NAME
485 - --shared-memory-partition [string]
486 Set the name of the shared memory partition for a given instrument.
487 Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME
489 - --frame-segments-file [filename]
490 Set the name of the LIGO light-weight XML file from which to load frame segments.
491 Optional iff --data-source is frames
493 - --frame-segments-name [string]
494 Set the name of the segments to extract from the segment tables.
495 Required iff --frame-segments-file is given
497 - --state-vector-on-bits [hex]
498 Set the state vector on bits to process (optional).
499 The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times.
500 Only currently has meaning for online (lvshm, framexmit) data
502 - --state-vector-off-bits [hex]
503 Set the state vector off bits to process (optional).
504 The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times.
505 Only currently has meaning for online (lvshm, framexmit) data
507 #### Typical usage case examples
509 -# Reading data from frames
511 --data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 --channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml --frame-segments-name=datasegments
513 -# Reading data from a fake LIGO source
515 --data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 --channel-name=H1=FAIKE-STRAIN
517 -# Reading online data via framexmit
519 --data-source=framexmit --channel-name=H1=FAIKE-STRAIN
521 -# Many other combinations possible, please add some!
523 group = optparse.OptionGroup(parser,
"Data source options",
"Use these options to set up the appropriate data source")
524 group.add_option(
"--data-source", metavar =
"source", help =
"Set the data source from [frames|framexmit|lvshm|nds|silence|white|AdvVirgo|LIGO|AdvLIGO]. Required.")
525 group.add_option(
"--block-size", type=
"int", metavar =
"bytes", default = 16384 * 8 * 512, help =
"Data block size to read in bytes. Default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz. This parameter is only used if --data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.")
526 group.add_option(
"--frame-cache", metavar =
"filename", help =
"Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional). This is required iff --data-source=frames")
527 group.add_option(
"--gps-start-time", metavar =
"seconds", help =
"Set the start time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
528 group.add_option(
"--gps-end-time", metavar =
"seconds", help =
"Set the end time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
529 group.add_option(
"--injections", metavar =
"filename", help =
"Set the name of the LIGO light-weight XML file from which to load injections (optional).")
530 group.add_option(
"--channel-name", metavar =
"name", action =
"append", help =
"Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
531 group.add_option(
"--nds-host", metavar =
"hostname", help =
"Set the remote host or IP address that serves nds data. This is required iff --data-source=nds")
532 group.add_option(
"--nds-port", metavar =
"portnumber", type=int, default=31200, help =
"Set the port of the remote host that serves nds data. This is required iff --data-source=nds")
533 group.add_option(
"--nds-channel-type", metavar =
"type", default =
"online", help =
"Set the port of the remote host that serves nds data. This is required only if --data-source=nds. default==online")
534 group.add_option(
"--framexmit-addr", metavar =
"name", action =
"append", help =
"Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")
535 group.add_option(
"--framexmit-iface", metavar =
"name", help =
"Set the multicast interface address of the framexmit service.")
536 group.add_option(
"--dq-channel-name", metavar =
"name", action =
"append", help =
"Set the name of the data quality (or state vector) channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
537 group.add_option(
"--shared-memory-partition", metavar =
"name", action =
"append", help =
"Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME")
538 group.add_option(
"--frame-segments-file", metavar =
"filename", help =
"Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff --data-source=frames")
539 group.add_option(
"--frame-segments-name", metavar =
"name", help =
"Set the name of the segments to extract from the segment tables. Required iff --frame-segments-file is given")
540 group.add_option(
"--state-vector-on-bits", metavar =
"bits", default = [], action =
"append", help =
"Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
541 group.add_option(
"--state-vector-off-bits", metavar =
"bits", default = [], action =
"append", help =
"Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
542 parser.add_option_group(group)
546 def do_seek(pipeline, seekevent):
549 for src
in pipeline.iterate_sources():
550 if src.set_state(gst.STATE_READY) != gst.STATE_CHANGE_SUCCESS:
551 raise RuntimeError(
"Element %s did not want to enter ready state" % src.get_name())
552 if not src.send_event(seekevent):
553 raise RuntimeError(
"Element %s did not handle seek event" % src.get_name())
576 def mksegmentsrcgate(pipeline, src, segment_list, seekevent = None, invert_output = False, **kwargs):
578 Takes a segment list and produces a gate driven by it. Hook up your own input and output.
580 @param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate's name.
582 segsrc = pipeparts.mksegmentsrc(pipeline, segment_list, invert_output = invert_output)
584 if seekevent
is not None:
585 do_seek(pipeline, seekevent)
586 return pipeparts.mkgate(pipeline, src, threshold = 1, control = segsrc, **kwargs)
649 def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = False):
651 All the conditionals and stupid pet tricks for reading real or
652 simulated h(t) data in one place.
654 Consult the append_options() function and the GWDataSourceInfo class
656 This src in general supports only one instrument although
657 GWDataSourceInfo contains dictionaries of multi-instrument things. By
658 specifying the instrument when calling this function you will get ony a single
659 instrument source. A code wishing to have multiple basicsrcs will need to call
660 this function for each instrument.
663 if gw_data_source_info.data_source ==
"white":
664 src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, volume = 1.0)
665 elif gw_data_source_info.data_source ==
"silence":
666 src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size, wave = 4)
667 elif gw_data_source_info.data_source ==
"LIGO":
668 src = pipeparts.mkfakeLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
669 elif gw_data_source_info.data_source ==
"AdvLIGO":
670 src = pipeparts.mkfakeadvLIGOsrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
671 elif gw_data_source_info.data_source ==
"AdvVirgo":
672 src = pipeparts.mkfakeadvvirgosrc(pipeline, instrument = instrument, channel_name = gw_data_source_info.channel_dict[instrument], blocksize = gw_data_source_info.block_size)
673 elif gw_data_source_info.data_source ==
"frames":
674 if instrument ==
"V1":
676 src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex =
"V")
678 src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, cache_src_regex = instrument[0], cache_dsc_regex = instrument)
679 demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum =
True, channel_list = map(
"%s:%s".__mod__, gw_data_source_info.channel_dict.items()))
683 src = pipeparts.mkqueue(pipeline,
None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 8 * gst.SECOND)
685 if gw_data_source_info.frame_segments[instrument]
is not None:
688 src = pipeparts.mkgate(pipeline, src, threshold = 1, control = pipeparts.mksegmentsrc(pipeline, gw_data_source_info.frame_segments[instrument]), name =
"%s_frame_segments_gate" % instrument)
689 pipeparts.framecpp_channeldemux_check_segments.set_probe(src.get_pad(
"src"), gw_data_source_info.frame_segments[instrument])
691 src = pipeparts.mkaudiorate(pipeline, src, skip_to_first =
True, silent =
False)
692 elif gw_data_source_info.data_source
in (
"framexmit",
"lvshm"):
694 state_vector_on_bits, state_vector_off_bits = gw_data_source_info.state_vector_on_off_bits[instrument]
696 if gw_data_source_info.data_source ==
"lvshm":
698 src = pipeparts.mklvshmsrc(pipeline, shm_name = gw_data_source_info.shm_part_dict[instrument], wait_time = 120)
699 elif gw_data_source_info.data_source ==
"framexmit":
700 src = pipeparts.mkframexmitsrc(pipeline, multicast_iface = gw_data_source_info.framexmit_iface, multicast_group = gw_data_source_info.framexmit_addr[instrument][0], port = gw_data_source_info.framexmit_addr[instrument][1], wait_time = 120)
703 raise ValueError(gw_data_source_info.data_source)
705 src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum =
True, skip_bad_files =
True)
709 strain = pipeparts.mkqueue(pipeline,
None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = gst.SECOND * 60 * 1)
713 statevector = pipeparts.mkqueue(pipeline,
None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = gst.SECOND * 60 * 1)
714 pipeparts.src_deferred_link(src,
"%s:%s" % (instrument, gw_data_source_info.dq_channel_dict[instrument]), statevector.get_pad(
"sink"))
715 if gw_data_source_info.dq_channel_type ==
"ODC" or gw_data_source_info.dq_channel_dict[instrument] ==
"Hrec_Flag_Quality":
717 statevector = pipeparts.mkgeneric(pipeline, statevector,
"lal_fixodc")
718 statevector = pipeparts.mkstatevector(pipeline, statevector, required_on = state_vector_on_bits, required_off = state_vector_off_bits)
719 @bottle.route(
"/%s/state_vector_on_off_gap.txt" % instrument)
720 def state_vector_state(elem = statevector):
721 t = float(lal.UTCToGPS(time.gmtime()))
722 on = elem.get_property(
"on-samples")
723 off = elem.get_property(
"off-samples")
724 gap = elem.get_property(
"gap-samples")
725 return "%.9f %d %d %d" % (t, on, off, gap)
728 src = pipeparts.mkgate(pipeline, strain, threshold = 1, control = statevector, default_state =
False, name =
"%s_state_vector_gate" % instrument)
731 src = pipeparts.mkaudiorate(pipeline, src, skip_to_first =
True, silent =
False)
732 @bottle.route(
"/%s/strain_add_drop.txt" % instrument)
733 def strain_add(elem = src):
734 t = float(lal.UTCToGPS(time.gmtime()))
735 add = elem.get_property(
"add")
736 drop = elem.get_property(
"drop")
738 return "%.9f %d %d" % (t, add / 16384., drop / 16384.)
741 src = pipeparts.mkqueue(pipeline, src, max_size_buffers = 0, max_size_bytes = 0, max_size_time = gst.SECOND * 60 * 10)
742 elif gw_data_source_info.data_source ==
"nds":
743 src = pipeparts.mkndssrc(pipeline, gw_data_source_info.nds_host, instrument, gw_data_source_info.channel_dict[instrument], gw_data_source_info.nds_channel_type, blocksize = gw_data_source_info.block_size, port = gw_data_source_info.nds_port)
745 raise ValueError(
"invalid data_source: %s" % gw_data_source_info.data_source)
752 src = pipeparts.mkaudioconvert(pipeline, src)
759 src = pipeparts.mkprogressreport(pipeline, src,
"progress_src_%s" % instrument)
765 if gw_data_source_info.injection_filename
is not None:
766 src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename)
769 src = pipeparts.mkqueue(pipeline, src, max_size_bytes = 0, max_size_buffers = 0, max_size_time = gst.SECOND * 64)
776 if gw_data_source_info.data_source
in (
"white",
"silence",
"LIGO",
"AdvLIGO",
"AdvVirgo",
"frames"):
777 do_seek(pipeline, gw_data_source_info.seekevent)
805 def mkhtgate(pipeline, src, control = None, threshold = 8.0, attack_length = 128, hold_length = 128, **kwargs):
807 A convenience function to provide thresholds on input data. This can
808 be used to remove large spikes / glitches etc. Of course you can use it for
809 other stuff by plugging whatever you want as input and ouput
811 NOTE: the queues constructed by this code assume the attack and
812 hold lengths combined are less than 1 second in duration.
817 control = src = pipeparts.mktee(pipeline, src)
818 src = pipeparts.mkqueue(pipeline, src, max_size_time = gst.SECOND, max_size_bytes = 0, max_size_buffers = 0)
819 return pipeparts.mkgate(pipeline, src, control = control, threshold = threshold, attack_length = -attack_length, hold_length = -hold_length, invert_control =
True, **kwargs)
822 if __name__ ==
"__main__":