1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 from __future__ import division
18
19 __author__ = "Nickolas Fotopoulos <nickolas.fotopoulos@ligo.org>"
20
21 from bisect import bisect_right
22 import httplib
23 import os
24 import os.path
25 import shutil
26 import sys
27 import operator
28
29 from glue.lal import Cache
30 from glue.segments import segment, segmentlist
31 from pylal.metaarray import TimeSeries, TimeSeriesList
32 from pylal.Fr import frgetvect1d
33
34 __all__ = ('__author__', 'FrameCache', "AutoqueryingFrameCache")
35
37 """
38 FrameCache is a transparent interface to LSC data. The user provides a LAL-
39 formatted cache file and the returned FrameCache object allows repeated
40 queries for channels and time, even across frame files. It also supports
41 smart, lazy local caching. Limitations: It only works for one-dimensional
42 time-series data.
43
44 Constructor:
45 FrameCache(cache_entries=None, scratchdir=None, verbose=False)
46
47 Inputs:
48 cache is a list of glue.lal.CacheEntry objects or a glue.lal.Cache.
49 Data will be retrieved from the frame files described within.
50
51 Scratchdir determines where to locally cache frames. If None, no
52 caching is performed.
53
54 Example:
55 >>> from glue import lal
56 >>> from pylal import frutils
57 >>> c = lal.Cache.fromfile(open("test.cache"))
58 >>> d = frutils.FrameCache(c, scratchdir="/tmp", verbose=True)
59 >>> data = d.fetch("H1:LSC-STRAIN", 861417967, 861417969)
60 Copying /Users/nvf/temp/H-H1_RDS_C03_L2-861417967-128.gwf -->
61 /tmp/H-H1_RDS_C03_L2-861417967-128.gwf.
62 >>> print(data)
63 [ 1.68448009e-16 1.69713183e-16 1.71046196e-16 ..., 1.80974629e-16
64 1.80911765e-16 1.80804879e-16] {'dt': 6.103515625e-05, 'segments': [segment(861417967, 861417969)], 'comments': [], 'name': 'H1:LSC-STRAIN'}
65 >>> exit()
66 Removing /tmp/H-H1_RDS_C03_L2-861417967-128.gwf.
67
68 """
69
70 - def __init__(self, cache_entries=None, scratchdir=None, verbose=False):
71 """ Initializes interface to frame data. See .__class__.__doc__"""
72
73
74
75 self._verbose = verbose
76 self._scratchdir = scratchdir
77 self._remotefiles = []
78 self._remotesegs = segmentlist()
79 self._remotecoverage = segmentlist()
80
81
82 if scratchdir is not None:
83 self._cachedfiles = []
84 self._cachedsegs = segmentlist()
85 self._cachecoverage = segmentlist()
86 else:
87 self._cachedfiles = self._remotefiles
88 self._cachedsegs = self._remotesegs
89 self._cachecoverage = self._remotecoverage
90
91 if cache_entries is not None:
92 self.add_cache(cache_entries)
93
95 """
96 Add information from some cache entries.
97 """
98 newentries = [entry for entry in cache_entries \
99 if entry.path not in self._remotefiles]
100 newfiles = [entry.path for entry in newentries]
101
102
103
104 for entry in cache_entries:
105
106 if entry.path in self._remotefiles:
107 continue
108 newseg, newfile = entry.segment, entry.path
109 insert_idx = bisect_right(self._remotesegs, newseg)
110 self._remotesegs.insert(insert_idx, newseg)
111 self._remotefiles.insert(insert_idx, newfile)
112 self._remotecoverage |= segmentlist([newseg])
113
114 self._remotecoverage.coalesce()
115
117 """
118 Clear cache in local scratch.
119 """
120 if self._scratchdir is None:
121 return
122 for f,s in zip(self._cachedfiles, self._cachedsegs):
123 self._unfetch(f, s)
124 return
125
126 - def fetch(self, channel, start, end):
127 """
128 Retrieve data, caching file locations and the files themselves.
129 """
130 seg = segment(start, end)
131
132 if not self._query(channel, start, end):
133 raise ValueError("%s not found in cache" % repr(segmentlist([seg]) - self._remotecoverage))
134
135
136
137 if seg not in self._cachecoverage:
138 for f,s in zip(self._remotefiles, self._remotesegs):
139 if seg.intersects(s) and s not in self._cachecoverage:
140 dest = os.path.join(self._scratchdir, os.path.split(f)[-1])
141 if self._verbose:
142 print "Copying %s -->\n %s." % (f, dest)
143 shutil.copy(f, dest)
144 ind = bisect_right(self._cachedsegs, s)
145 self._cachedfiles.insert(ind, dest)
146 self._cachedsegs.insert(ind, s)
147 self._cachecoverage |= segmentlist([s])
148 assert seg in self._cachecoverage
149
150
151 return self._fetch(channel, start, end)
152
153 - def _query(self, channel, start, end):
154 "Do we know where the frame file is?"
155 return segment(start, end) in self._remotecoverage
156
157 - def _fetch(self, channel, start, end, comments=[]):
158 """
159 Internal method to actually retrieve and return data as TimeSeries,
160 assuming that self._framefiles is all set. Does not check boundaries.
161 """
162 toreturn = TimeSeriesList([])
163
164 if start==end:
165 return toreturn
166
167
168 try:
169
170
171
172 index = self._cachedsegs.find(start)
173 except ValueError:
174 print >>sys.stderr, "Couldn't find any frame files to cover",\
175 str(start),"to",str(end),"among:"
176 print >>sys.stderr, str(self._cachedfiles)
177 return toreturn
178
179
180
181 now = start
182 while now < end:
183 dur = min(end, self._cachedsegs[index][1]) - now
184 data, GPS_start, t_low, dt, x_unit, y_unit = \
185 frgetvect1d(self._cachedfiles[index], channel, now, dur, 0)
186 meta = {"name": channel, "dt": dt,
187 "segments": [segment(now, now+dur)], "comments": comments}
188 toreturn.append(TimeSeries(data, meta))
189 now += dur
190 index += 1
191
192 if len(toreturn) == 0:
193 print >>sys.stderr, "This print statement should never execute."
194 print >>sys.stderr,"Couldn't find all frame files needed to cover",\
195 str(start), "to", str(end), "among:"
196 print >>sys.stderr, str(self._cachedfiles)
197
198 toreturn = toreturn.merge_list()
199 toreturn.metadata.segments.coalesce()
200
201 return toreturn
202
204 """
205 Removes files from local scratch space based on start, end
206 pairs. Silently ignores non-existent times. Remove if file end
207 is between start and end. This is biased to prevent cache misses
208 for future fetches being in the future. (Processing frames in
209 chronological order)
210 """
211 if self._scratchdir is None:
212 return
213
214 for f,s in zip(self._cachedfiles, self._cachedsegs):
215 if start < s[1] <= end:
216 self._unfetch(f,s)
217
219 """
220 Internal method to actually remove a file from cache.
221 """
222 if self._scratchdir is None:
223 return
224 if filename not in self._cachedfiles:
225 print >>sys.stderr, \
226 "Cache inconsistency: Delete request for file not in cache."
227 return
228 if self._verbose: print "Removing %s." % filename
229 os.remove(filename)
230 self._cachedfiles.remove(filename)
231 self._cachedsegs.remove(seg)
232 self._cachecoverage -= segmentlist([seg])
233 return
234
235
236
237
238
239
241 """
242 Test that the proxy certificate is RFC 3820
243 compliant and that it is valid for at least
244 the next 15 minutes.
245 """
246 try:
247 import M2Crypto
248 except ImportError, e:
249 print >> sys.stderr, """
250 validateProxy requires the M2Crypto module.
251
252 On CentOS 5 and other RHEL-based platforms
253 this package is available from the EPEL
254 repository by doing
255
256 yum install m2crypto
257
258 For Debian Lenny this package is available
259 by doing
260
261 apt-get install python-m2crypto
262
263 Mac OS X users can find this package in MacPorts.
264
265 %s
266 """ % e
267 raise
268
269
270 try:
271 proxy = M2Crypto.X509.load_cert(path)
272 except Exception, e:
273 msg = "Unable to load proxy from path %s : %s" % (path, e)
274 raise RuntimeError(msg)
275
276
277 try:
278 proxy.get_ext("proxyCertInfo")
279 except LookupError:
280 rfc_proxy_msg = """\
281 Could not find a RFC 3820 compliant proxy credential.
282 Please run 'grid-proxy-init -rfc' and try again.
283 """
284 raise RuntimeError(rfc_proxy_msg)
285
286
287 import time, calendar
288 try:
289 expireASN1 = proxy.get_not_after().__str__()
290 expireGMT = time.strptime(expireASN1, "%b %d %H:%M:%S %Y %Z")
291 expireUTC = calendar.timegm(expireGMT)
292 now = int(time.time())
293 secondsLeft = expireUTC - now
294 except Exception, e:
295
296
297 secondsLeft = 3600
298
299 if secondsLeft <= 0:
300 msg = """\
301 Your proxy certificate is expired.
302
303 Please generate a new proxy certificate and
304 try again.
305 """
306 raise RuntimeError(msg)
307
308 if secondsLeft < (60 * 15):
309 msg = """\
310 Your proxy certificate expires in less than
311 15 minutes.
312
313 Please generate a new proxy certificate and
314 try again.
315 """
316 raise RuntimeError(msg)
317
318
319 return True
320
322 """
323 Follow the usual path that GSI libraries would
324 follow to find a valid proxy credential but
325 also allow an end entity certificate to be used
326 along with an unencrypted private key if they
327 are pointed to by X509_USER_CERT and X509_USER_KEY
328 since we expect this will be the output from
329 the eventual ligo-login wrapper around
330 kinit and then myproxy-login.
331 """
332 rfc_proxy_msg = """\
333 Could not find a RFC 3820 compliant proxy credential.
334 Please run 'grid-proxy-init -rfc' and try again.
335 """
336
337
338 if os.environ.has_key('X509_USER_PROXY'):
339 filePath = os.environ['X509_USER_PROXY']
340 if validateProxy(filePath):
341 return filePath, filePath
342 else:
343 raise RuntimeError(rfc_proxy_msg)
344
345
346 if os.environ.has_key('X509_USER_CERT'):
347 if os.environ.has_key('X509_USER_KEY'):
348 certFile = os.environ['X509_USER_CERT']
349 keyFile = os.environ['X509_USER_KEY']
350 return certFile, keyFile
351
352
353 uid = os.getuid()
354 path = "/tmp/x509up_u%d" % uid
355
356 if os.access(path, os.R_OK):
357 if validateProxy(path):
358 return path, path
359 else:
360 raise RuntimeError(rfc_proxy_msg)
361
362
363 raise RuntimeError(rfc_proxy_msg)
364
365 -def query_LDR(server, port, site, frameType, gpsStart, gpsEnd, urlType=None, noproxy=False):
366 """
367 Return a list of URLs to frames covering the requested time, as returned
368 by the LDR server.
369 """
370 try:
371 import cjson
372 except ImportError, e:
373 print >> sys.stderr, """
374 frutils requires the cjson module.
375
376 On CentOS 5 and other RHEL-based platforms
377 this package is available from the EPEL
378 repository by doing
379
380 yum install python-cjson
381
382 For Debian Lenny this package is available by doing
383
384 apt-get install python-cjson
385
386 Mac OS X users can find this package in MacPorts.
387
388 %s
389 """ % e
390 raise
391
392 url = "/LDR/services/data/v1/gwf/%s/%s/%s,%s" % (site, frameType, gpsStart, gpsEnd)
393
394 if urlType:
395 url += "/%s" % urlType
396
397
398 url += ".json"
399
400
401 if noproxy or port == 80:
402 h = httplib.HTTPConnection(server, port)
403 else:
404 certFile, keyFile = findCredential()
405 h = httplib.HTTPSConnection(server, key_file = keyFile, cert_file = certFile)
406
407
408 try:
409 h.request("GET", url)
410 response = h.getresponse()
411 except Exception, e:
412 msg = "Unable to query server %s: %s\n\nPerhaps you need a valid proxy credential?\n" % (server, e)
413 raise RuntimeError(msg)
414
415
416 if response.status != 200:
417 msg = "Server returned code %d: %s" % (response.status, response.reason)
418 body = response.read()
419 msg += body
420 raise RuntimeError(msg)
421
422
423 body = response.read()
424
425
426 return cjson.decode(body)
427
429 """
430 This subclass of FrameCache will query ligo_data_find automatically,
431 so no LAL-cache files are required. Limitation: you'll need one instance
432 per frame type.
433
434 Constructor:
435 AutoqueryingFrameCache(frametype, hostPortString=None, scratchdir=None,
436 verbose=False)
437
438 Inputs:
439 frametype is the type of GWF frame you seek (e.g. RDS_R_L1).
440 hostPortString is the name of the LDR server and optionally,
441 with colon separation, the port (e.g. ldr.ligo.caltech.edu)
442 scratchdir determines where to locally cache frames. If None, no
443 caching is performed.
444
445 Example:
446 >>> from pylal import frutils
447 >>> d = frutils.AutoqueryingFrameCache(frametype="H1_RDS_C03_L2", scratchdir="/tmp", verbose=True)
448 >>> data = d.fetch("H1:LSC-STRAIN", 861417967, 861417969)
449 Copying /Users/nvf/temp/H-H1_RDS_C03_L2-861417967-128.gwf -->
450 /tmp/H-H1_RDS_C03_L2-861417967-128.gwf.
451 >>> print(data)
452 [ 1.68448009e-16 1.69713183e-16 1.71046196e-16 ..., 1.80974629e-16
453 1.80911765e-16 1.80804879e-16] {'dt': 6.103515625e-05, 'segments': [segment(861417967, 861417969)], 'comments': [], 'name': 'H1:LSC-STRAIN'}
454 >>> exit()
455 Removing /tmp/H-H1_RDS_C03_L2-861417967-128.gwf.
456
457 Using AutoqueryingFrameCache outside of LDG clusters, using Caltech as a
458 gateway:
459 * Just the first time you do this procedure: "sudo mkdir /data && sudo chown
460 albert.einstein /data" (replace albert.einstein with your local username;
461 /data may be different for different clusters)
462 * Set the LIGO_DATAFIND_SERVER environment variable to ldr.ligo.caltech.edu
463 (or the LDR server of the LDG cluster nearest you)
464 * Use "sshfs -o ssh_command=gsissh
465 albert.einstein@ldas-pcdev1.ligo.caltech.edu:/data /data" (replace
466 albert.einstein with your cluster username)
467 * Use "umount /data" when you're done. Unmounting cleanly will help prevent
468 headaches the next time you want to set this up.
469 """
470 - def __init__(self, frametype, hostPortString=None, scratchdir=None,
471 verbose=False):
472 FrameCache.__init__(self, None, scratchdir, verbose)
473
474 if not frametype:
475 raise ValueError("frametype required")
476 self.frametype = frametype
477
478 if hostPortString is None:
479 if os.environ.has_key('LIGO_DATAFIND_SERVER'):
480 hostPortString = os.environ['LIGO_DATAFIND_SERVER']
481 else:
482 raise ValueError("no way to determine LIGO_DATAFIND_SERVER")
483 if hostPortString.find(':') < 0:
484
485 self.host = hostPortString
486 self.port = None
487 else:
488
489 self.host, portString = hostPortString.split(':')
490 self.port = int(portString)
491
492 - def _query(self, channel, start, end):
493 "Do we know where the frame file is?"
494 if segment(start, end) in self._remotecoverage:
495 return True
496 urls = query_LDR(self.host, self.port, channel[0], self.frametype, start, end, urlType="file")
497 if urls:
498 new = Cache.from_urls(urls, coltype=int)
499 new.sort(key=operator.attrgetter("segment"))
500 self.add_cache(new)
501 return segment(start, end) in self._remotecoverage
502