gstlal-inspiral  0.4.2
 All Classes Namespaces Files Functions Variables Pages
hoftcache.py
1 # Copyright (C) 2013 Kipp Cannon, Chad Hanna, Drew Keppel
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 
18 #
19 # =============================================================================
20 #
21 # Preamble
22 #
23 # =============================================================================
24 #
25 
26 
27 import os
28 import sys
29 import tempfile
30 
31 
32 import pygtk
33 pygtk.require("2.0")
34 import gobject
35 gobject.threads_init()
36 import pygst
37 pygst.require("0.10")
38 import gst
39 
40 
41 from gstlal import datasource
42 from gstlal import pipeparts
43 from gstlal import simplehandler
44 
45 
46 #
47 # =============================================================================
48 #
49 # File Clean-Up Machinery
50 #
51 # =============================================================================
52 #
53 
54 
55 class tempcache(list):
56  """
57  List-like object to hold lal.CacheEntry objects, and run
58  os.unlink() on the .path of each as they are removed from the list
59  or when the list is garbage collected. All errors during file
60  removal are ignored.
61 
62  Note that there is no way to remove a CacheEntry from this list
63  without the file it represents being deleted. If, after adding a
64  CacheEntry to this list it is decided the file must not be deleted,
65  then instead of removing it from the list it must be replaced with
66  something else, e.g. None, and that item can then be removed from
67  the list.
68 
69  Example:
70 
71  >>> from glue.lal import CacheEntry
72  >>> # create a cache, and add an entry
73  >>> cache = tempcache()
74  >>> cache.append(CacheEntry("- - - - file://localhost/tmp/blah.txt"))
75  >>> # now remove it without the file being deleted
76  >>> cache[-1] = None
77  >>> del cache[-1]
78  """
79  def __delitem__(self, i):
80  try:
81  os.unlink(self[i].path)
82  except:
83  pass
84  super(tempcache, self).__delitem__(i)
85 
86  def __delslice__(self, i, j):
87  try:
88  for entry in self[i:j]:
89  try:
90  os.unlink(entry.path)
91  except:
92  pass
93  except:
94  pass
95  super(tempcache, self).__delslice__(i, j)
96 
97  def __del__(self):
98  del self[:]
99 
100 
101 #
102 # =============================================================================
103 #
104 # Handler
105 #
106 # =============================================================================
107 #
108 
109 
110 class Handler(simplehandler.Handler):
111  def __init__(self, *args, **kwargs):
112  super(Handler, self).__init__(*args, **kwargs)
113  self.cache = tempcache()
114 
115  def do_on_message(self, bus, message):
116  if message.type == gst.MESSAGE_ELEMENT and message.structure.get_name() == "GstMultiFileSink":
117  self.cache.append(pipeparts.framecpp_filesink_cache_entry_from_mfs_message(message))
118  return True
119  return False
120 
121 
122 #
123 # =============================================================================
124 #
125 # Modified Version of mkbasicsrc from datasource.py
126 #
127 # =============================================================================
128 #
129 
130 
131 def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = False):
132 
133  if gw_data_source_info.data_source == "frames":
134  if instrument == "V1":
135  #FIXME Hack because virgo often just uses "V" in the file names rather than "V1". We need to sieve on "V"
136  src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, use_mmap = True, cache_src_regex = "V")
137  else:
138  src = pipeparts.mklalcachesrc(pipeline, location = gw_data_source_info.frame_cache, use_mmap = True, cache_src_regex = instrument[0], cache_dsc_regex = instrument)
139  demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum = True, channel_list = map("%s:%s".__mod__, gw_data_source_info.channel_dict.items()))
140  pipeparts.framecpp_channeldemux_set_units(demux, dict.fromkeys(demux.get_property("channel-list"), "strain"))
141  # allow frame reading and decoding to occur in a diffrent thread
142  src = pipeparts.mkqueue(pipeline, None, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 8 * gst.SECOND)
143  pipeparts.src_deferred_link(demux, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), src.get_pad("sink"))
144  # FIXME: remove this when pipeline can handle disconts
145  src = pipeparts.mkaudiorate(pipeline, src, skip_to_first = True, silent = False)
146  else:
147  raise ValueError("invalid data_source: %s" % gw_data_source_info.data_source)
148 
149  # provide an audioconvert element to allow Virgo data (which is single-precision) to be adapted into the pipeline
150  src = pipeparts.mkaudioconvert(pipeline, src)
151 
152  # progress report
153  if verbose:
154  src = pipeparts.mkprogressreport(pipeline, src, "progress_src_%s" % instrument)
155 
156  # optional injections
157  if gw_data_source_info.injection_filename is not None:
158  src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename)
159  # let the injection code run in a different thread than the whitener, etc.,
160  src = pipeparts.mkqueue(pipeline, src, max_size_bytes = 0, max_size_buffers = 0, max_size_time = gst.SECOND * 64)
161 
162  # seek the pipeline
163  # FIXME: remove
164  datasource.do_seek(pipeline, gw_data_source_info.seekevent)
165 
166 
167  return src
168 
169 
170 #
171 # =============================================================================
172 #
173 # Pipeline
174 #
175 # =============================================================================
176 #
177 
178 
179 def build_pipeline(pipeline, data_source_info, output_path = tempfile.gettempdir(), sample_rate = None, description = "TMPFILE_DELETE_ME", channel_comment = None, frame_duration = 1, frames_per_file = 1024, verbose = False):
180  #
181  # get instrument and channel name (requires exactly one
182  # instrument+channel)
183  #
184 
185  channelmux_input_dict = {}
186 
187  for instrument, channel_name in data_source_info.channel_dict.items():
188  #
189  # retrieve h(t)
190  #
191 
192  src = mkbasicsrc(pipeline, data_source_info, instrument, verbose = verbose)
193 
194  #
195  # optionally resample
196  #
197 
198  if sample_rate is not None:
199  # make sure we're *down*sampling
200  src = pipeparts.mkcapsfilter(pipeline, src, "audio/x-raw-float, rate=[%d,MAX]" % sample_rate)
201  src = pipeparts.mkresample(pipeline, src, quality = 9)
202  src = pipeparts.mkcapsfilter(pipeline, src, "audio/x-raw-float, rate=%d" % sample_rate)
203 
204  #
205  # pack into frame files for output
206  #
207 
208  src = pipeparts.mkframecppchannelmux(pipeline, {"%s:%s" % (instrument, channel_name): src}, frame_duration = frame_duration, frames_per_file = frames_per_file)
209  for pad in src.sink_pads():
210  if channel_comment is not None:
211  pad.set_property("comment", channel_comment)
212  pad.set_property("pad-type", "FrProcData")
213  pipeparts.mkframecppfilesink(pipeline, src, frame_type = "%s_%s" % (instrument, description), path = output_path)
214 
215 
216 #
217 # =============================================================================
218 #
219 # Collect and Cache h(t)
220 #
221 # =============================================================================
222 #
223 
224 
225 def cache_hoft(data_source_info, channel_comment = "cached h(t) for inspiral search", verbose = False, **kwargs):
226  #
227  # build pipeline
228  #
229 
230 
231  mainloop = gobject.MainLoop()
232  pipeline = gst.Pipeline("pipeline")
233  handler = Handler(mainloop, pipeline)
234 
235 
236  if verbose:
237  print >>sys.stderr, "assembling pipeline ...",
238  build_pipeline(pipeline, data_source_info, channel_comment = channel_comment, verbose = verbose, **kwargs)
239  if verbose:
240  print >>sys.stderr, "done"
241 
242 
243  #
244  # run pipeline
245  #
246 
247 
248  if verbose:
249  print >>sys.stderr, "setting pipeline state to playing ..."
250  if pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_SUCCESS:
251  raise RuntimeError("pipeline did not enter playing state")
252 
253  if verbose:
254  print >>sys.stderr, "running pipeline ..."
255  mainloop.run()
256 
257 
258  #
259  # return tempcache object. when this object is garbage collected
260  # the frame files will be deleted. keep a reference alive as long
261  # as you wish to preserve the files.
262  #
263 
264 
265  return handler.cache