35 gobject.threads_init()
41 from gstlal
import datasource
42 from gstlal
import pipeparts
43 from gstlal
import simplehandler
57 List-like object to hold lal.CacheEntry objects, and run
58 os.unlink() on the .path of each as they are removed from the list
59 or when the list is garbage collected. All errors during file
62 Note that there is no way to remove a CacheEntry from this list
63 without the file it represents being deleted. If, after adding a
64 CacheEntry to this list it is decided the file must not be deleted,
65 then instead of removing it from the list it must be replaced with
66 something else, e.g. None, and that item can then be removed from
71 >>> from glue.lal import CacheEntry
72 >>> # create a cache, and add an entry
73 >>> cache = tempcache()
74 >>> cache.append(CacheEntry("- - - - file://localhost/tmp/blah.txt"))
75 >>> # now remove it without the file being deleted
79 def __delitem__(self, i):
81 os.unlink(self[i].path)
84 super(tempcache, self).__delitem__(i)
86 def __delslice__(self, i, j):
88 for entry
in self[i:j]:
95 super(tempcache, self).__delslice__(i, j)
111 def __init__(self, *args, **kwargs):
112 super(Handler, self).__init__(*args, **kwargs)
115 def do_on_message(self, bus, message):
116 if message.type == gst.MESSAGE_ELEMENT
and message.structure.get_name() ==
"GstMultiFileSink":
117 self.cache.append(pipeparts.framecpp_filesink_cache_entry_from_mfs_message(message))
131 def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = False):
133 if gw_data_source_info.data_source ==
"frames":
134 if instrument ==
"V1":
136 src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, use_mmap =
True, cache_src_regex =
"V")
138 src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, use_mmap =
True, cache_src_regex = instrument[0], cache_dsc_regex = instrument)
139 demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum =
True, channel_list = map(
"%s:%s".__mod__, gw_data_source_info.channel_dict.items()))
140 pipeparts.framecpp_channeldemux_set_units(demux, dict.fromkeys(demux.get_property(
"channel-list"),
"strain"))
142 src = pipeparts.mkqueue(pipeline,
None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 8 * gst.SECOND)
143 pipeparts.src_deferred_link(demux,
"%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), src.get_pad(
"sink"))
145 src = pipeparts.mkaudiorate(pipeline, src, skip_to_first =
True, silent =
False)
147 raise ValueError(
"invalid data_source: %s" % gw_data_source_info.data_source)
150 src = pipeparts.mkaudioconvert(pipeline, src)
154 src = pipeparts.mkprogressreport(pipeline, src,
"progress_src_%s" % instrument)
157 if gw_data_source_info.injection_filename
is not None:
158 src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename)
160 src = pipeparts.mkqueue(pipeline, src, max_size_bytes = 0, max_size_buffers = 0, max_size_time = gst.SECOND * 64)
164 datasource.do_seek(pipeline, gw_data_source_info.seekevent)
179 def build_pipeline(pipeline, data_source_info, output_path = tempfile.gettempdir(), sample_rate =
None, description =
"TMPFILE_DELETE_ME", channel_comment =
None, frame_duration = 1, frames_per_file = 1024, verbose =
False):
185 channelmux_input_dict = {}
187 for instrument, channel_name
in data_source_info.channel_dict.items():
192 src = mkbasicsrc(pipeline, data_source_info, instrument, verbose = verbose)
198 if sample_rate
is not None:
200 src = pipeparts.mkcapsfilter(pipeline, src,
"audio/x-raw-float, rate=[%d,MAX]" % sample_rate)
201 src = pipeparts.mkresample(pipeline, src, quality = 9)
202 src = pipeparts.mkcapsfilter(pipeline, src,
"audio/x-raw-float, rate=%d" % sample_rate)
208 src = pipeparts.mkframecppchannelmux(pipeline, {
"%s:%s" % (instrument, channel_name): src}, frame_duration = frame_duration, frames_per_file = frames_per_file)
209 for pad
in src.sink_pads():
210 if channel_comment
is not None:
211 pad.set_property(
"comment", channel_comment)
212 pad.set_property(
"pad-type",
"FrProcData")
213 pipeparts.mkframecppfilesink(pipeline, src, frame_type =
"%s_%s" % (instrument, description), path = output_path)
225 def cache_hoft(data_source_info, channel_comment = "cached h(t)
for inspiral search
", verbose = False, **kwargs):
231 mainloop = gobject.MainLoop()
232 pipeline = gst.Pipeline(
"pipeline")
233 handler =
Handler(mainloop, pipeline)
237 print >>sys.stderr,
"assembling pipeline ...",
238 build_pipeline(pipeline, data_source_info, channel_comment = channel_comment, verbose = verbose, **kwargs)
240 print >>sys.stderr,
"done"
249 print >>sys.stderr,
"setting pipeline state to playing ..."
250 if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
251 raise RuntimeError(
"pipeline did not enter playing state")
254 print >>sys.stderr,
"running pipeline ..."