Package pylal :: Module frutils
[hide private]
[frames] | no frames]

Source Code for Module pylal.frutils

  1  # Copyright (C) 2009-11  Nickolas Fotopoulos 
  2  # 
  3  # This program is free software; you can redistribute it and/or modify it 
  4  # under the terms of the GNU General Public License as published by the 
  5  # Free Software Foundation; either version 2 of the License, or (at your 
  6  # option) any later version. 
  7  # 
  8  # This program is distributed in the hope that it will be useful, but 
  9  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
 11  # Public License for more details. 
 12  # 
 13  # You should have received a copy of the GNU General Public License along 
 14  # with this program; if not, write to the Free Software Foundation, Inc., 
 15  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
 16   
 17  from __future__ import division 
 18   
 19  __author__ = "Nickolas Fotopoulos <nickolas.fotopoulos@ligo.org>" 
 20   
 21  from bisect import bisect_right 
 22  import httplib 
 23  import os 
 24  import os.path 
 25  import shutil 
 26  import sys 
 27  import operator 
 28   
 29  from glue.lal import Cache 
 30  from glue.segments import segment, segmentlist 
 31  from pylal.metaarray import TimeSeries, TimeSeriesList 
 32  from pylal.Fr import frgetvect1d 
 33   
 34  __all__ = ('__author__', 'FrameCache', "AutoqueryingFrameCache") 
 35   
36 -class FrameCache(object):
37 """ 38 FrameCache is a transparent interface to LSC data. The user provides a LAL- 39 formatted cache file and the returned FrameCache object allows repeated 40 queries for channels and time, even across frame files. It also supports 41 smart, lazy local caching. Limitations: It only works for one-dimensional 42 time-series data. 43 44 Constructor: 45 FrameCache(cache_entries=None, scratchdir=None, verbose=False) 46 47 Inputs: 48 cache is a list of glue.lal.CacheEntry objects or a glue.lal.Cache. 49 Data will be retrieved from the frame files described within. 50 51 Scratchdir determines where to locally cache frames. If None, no 52 caching is performed. 53 54 Example: 55 >>> from glue import lal 56 >>> from pylal import frutils 57 >>> c = lal.Cache.fromfile(open("test.cache")) 58 >>> d = frutils.FrameCache(c, scratchdir="/tmp", verbose=True) 59 >>> data = d.fetch("H1:LSC-STRAIN", 861417967, 861417969) 60 Copying /Users/nvf/temp/H-H1_RDS_C03_L2-861417967-128.gwf --> 61 /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 62 >>> print(data) 63 [ 1.68448009e-16 1.69713183e-16 1.71046196e-16 ..., 1.80974629e-16 64 1.80911765e-16 1.80804879e-16] {'dt': 6.103515625e-05, 'segments': [segment(861417967, 861417969)], 'comments': [], 'name': 'H1:LSC-STRAIN'} 65 >>> exit() 66 Removing /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 67 68 """ 69
70 - def __init__(self, cache_entries=None, scratchdir=None, verbose=False):
71 """ Initializes interface to frame data. See .__class__.__doc__""" 72 73 # Simple initializations 74 # Use list of segments vs segmentlist to prevent merging. 75 self._verbose = verbose 76 self._scratchdir = scratchdir 77 self._remotefiles = [] # filename list 78 self._remotesegs = segmentlist() # list of segments 79 self._remotecoverage = segmentlist() # coalesced copy of remotesegs 80 81 # if we have a scratchdir, maintain independent lists 82 if scratchdir is not None: 83 self._cachedfiles = [] 84 self._cachedsegs = segmentlist() 85 self._cachecoverage = segmentlist() 86 else: 87 self._cachedfiles = self._remotefiles 88 self._cachedsegs = self._remotesegs 89 self._cachecoverage = self._remotecoverage 90 91 if cache_entries is not None: 92 self.add_cache(cache_entries)
93
94 - def add_cache(self, cache_entries):
95 """ 96 Add information from some cache entries. 97 """ 98 newentries = [entry for entry in cache_entries \ 99 if entry.path not in self._remotefiles] 100 newfiles = [entry.path for entry in newentries] 101 102 # We iterate here to prevent the segment and files from getting added in 103 # an unsorted manner 104 for entry in cache_entries: 105 # We already have it, skip it 106 if entry.path in self._remotefiles: 107 continue 108 newseg, newfile = entry.segment, entry.path 109 insert_idx = bisect_right(self._remotesegs, newseg) 110 self._remotesegs.insert(insert_idx, newseg) 111 self._remotefiles.insert(insert_idx, newfile) 112 self._remotecoverage |= segmentlist([newseg]) 113 114 self._remotecoverage.coalesce()
115
116 - def __del__(self):
117 """ 118 Clear cache in local scratch. 119 """ 120 if self._scratchdir is None: 121 return 122 for f,s in zip(self._cachedfiles, self._cachedsegs): 123 self._unfetch(f, s) 124 return
125
126 - def fetch(self, channel, start, end):
127 """ 128 Retrieve data, caching file locations and the files themselves. 129 """ 130 seg = segment(start, end) 131 132 if not self._query(channel, start, end): 133 raise ValueError("%s not found in cache" % repr(segmentlist([seg]) - self._remotecoverage)) 134 135 # Need to cache files locally 136 # Note: seg *will* be in self._cachecoverage if self.scratchdir is None. 137 if seg not in self._cachecoverage: 138 for f,s in zip(self._remotefiles, self._remotesegs): 139 if seg.intersects(s) and s not in self._cachecoverage: 140 dest = os.path.join(self._scratchdir, os.path.split(f)[-1]) 141 if self._verbose: 142 print "Copying %s -->\n %s." % (f, dest) 143 shutil.copy(f, dest) 144 ind = bisect_right(self._cachedsegs, s) 145 self._cachedfiles.insert(ind, dest) 146 self._cachedsegs.insert(ind, s) 147 self._cachecoverage |= segmentlist([s]) 148 assert seg in self._cachecoverage 149 150 # Finally, return the cached data 151 return self._fetch(channel, start, end)
152
153 - def _query(self, channel, start, end):
154 "Do we know where the frame file is?" 155 return segment(start, end) in self._remotecoverage
156
157 - def _fetch(self, channel, start, end, comments=[]):
158 """ 159 Internal method to actually retrieve and return data as TimeSeries, 160 assuming that self._framefiles is all set. Does not check boundaries. 161 """ 162 toreturn = TimeSeriesList([]) 163 164 if start==end: 165 return toreturn 166 167 # Find first frame 168 try: 169 #tmp = sorted(zip(self._cachedsegs, self._cachedfiles)) 170 #self._cachedsegs = segmentlist([seg for seg, frfile in tmp]) 171 #self._cachedfiles = [frfile for seg, frfile in tmp] 172 index = self._cachedsegs.find(start) 173 except ValueError: 174 print >>sys.stderr, "Couldn't find any frame files to cover",\ 175 str(start),"to",str(end),"among:" 176 print >>sys.stderr, str(self._cachedfiles) 177 return toreturn 178 179 # Get frames; an error probably means that the frames didn't cover 180 # the whole period of time. Cleanly handles frames of varying lengths. 181 now = start 182 while now < end: 183 dur = min(end, self._cachedsegs[index][1]) - now 184 data, GPS_start, t_low, dt, x_unit, y_unit = \ 185 frgetvect1d(self._cachedfiles[index], channel, now, dur, 0) 186 meta = {"name": channel, "dt": dt, 187 "segments": [segment(now, now+dur)], "comments": comments} 188 toreturn.append(TimeSeries(data, meta)) 189 now += dur 190 index += 1 191 192 if len(toreturn) == 0: 193 print >>sys.stderr, "This print statement should never execute." 194 print >>sys.stderr,"Couldn't find all frame files needed to cover",\ 195 str(start), "to", str(end), "among:" 196 print >>sys.stderr, str(self._cachedfiles) 197 198 toreturn = toreturn.merge_list() 199 toreturn.metadata.segments.coalesce() 200 201 return toreturn
202
203 - def unfetch(self, start, end):
204 """ 205 Removes files from local scratch space based on start, end 206 pairs. Silently ignores non-existent times. Remove if file end 207 is between start and end. This is biased to prevent cache misses 208 for future fetches being in the future. (Processing frames in 209 chronological order) 210 """ 211 if self._scratchdir is None: 212 return 213 214 for f,s in zip(self._cachedfiles, self._cachedsegs): 215 if start < s[1] <= end: 216 self._unfetch(f,s)
217
218 - def _unfetch(self, filename, seg):
219 """ 220 Internal method to actually remove a file from cache. 221 """ 222 if self._scratchdir is None: 223 return 224 if filename not in self._cachedfiles: 225 print >>sys.stderr, \ 226 "Cache inconsistency: Delete request for file not in cache." 227 return 228 if self._verbose: print "Removing %s." % filename 229 os.remove(filename) 230 self._cachedfiles.remove(filename) 231 self._cachedsegs.remove(seg) 232 self._cachecoverage -= segmentlist([seg]) 233 return
234 235 # 236 # Set up a FrameCache subclass that queries LDR on-the-fly for frame locations; 237 # Contains many bits stolen from ligo_data_find in Glue. 238 # 239
240 -def validateProxy(path):
241 """ 242 Test that the proxy certificate is RFC 3820 243 compliant and that it is valid for at least 244 the next 15 minutes. 245 """ 246 try: 247 import M2Crypto 248 except ImportError, e: 249 print >> sys.stderr, """ 250 validateProxy requires the M2Crypto module. 251 252 On CentOS 5 and other RHEL-based platforms 253 this package is available from the EPEL 254 repository by doing 255 256 yum install m2crypto 257 258 For Debian Lenny this package is available 259 by doing 260 261 apt-get install python-m2crypto 262 263 Mac OS X users can find this package in MacPorts. 264 265 %s 266 """ % e 267 raise 268 269 # load the proxy from path 270 try: 271 proxy = M2Crypto.X509.load_cert(path) 272 except Exception, e: 273 msg = "Unable to load proxy from path %s : %s" % (path, e) 274 raise RuntimeError(msg) 275 276 # make sure the proxy is RFC 3820 compliant 277 try: 278 proxy.get_ext("proxyCertInfo") 279 except LookupError: 280 rfc_proxy_msg = """\ 281 Could not find a RFC 3820 compliant proxy credential. 282 Please run 'grid-proxy-init -rfc' and try again. 283 """ 284 raise RuntimeError(rfc_proxy_msg) 285 286 # attempt to make sure the proxy is still good for more than 15 minutes 287 import time, calendar 288 try: 289 expireASN1 = proxy.get_not_after().__str__() 290 expireGMT = time.strptime(expireASN1, "%b %d %H:%M:%S %Y %Z") 291 expireUTC = calendar.timegm(expireGMT) 292 now = int(time.time()) 293 secondsLeft = expireUTC - now 294 except Exception, e: 295 # problem getting or parsing time so just let the client 296 # continue and pass the issue along to the server 297 secondsLeft = 3600 298 299 if secondsLeft <= 0: 300 msg = """\ 301 Your proxy certificate is expired. 302 303 Please generate a new proxy certificate and 304 try again. 305 """ 306 raise RuntimeError(msg) 307 308 if secondsLeft < (60 * 15): 309 msg = """\ 310 Your proxy certificate expires in less than 311 15 minutes. 312 313 Please generate a new proxy certificate and 314 try again. 315 """ 316 raise RuntimeError(msg) 317 318 # return True to indicate validated proxy 319 return True
320
321 -def findCredential():
322 """ 323 Follow the usual path that GSI libraries would 324 follow to find a valid proxy credential but 325 also allow an end entity certificate to be used 326 along with an unencrypted private key if they 327 are pointed to by X509_USER_CERT and X509_USER_KEY 328 since we expect this will be the output from 329 the eventual ligo-login wrapper around 330 kinit and then myproxy-login. 331 """ 332 rfc_proxy_msg = """\ 333 Could not find a RFC 3820 compliant proxy credential. 334 Please run 'grid-proxy-init -rfc' and try again. 335 """ 336 337 # use X509_USER_PROXY from environment if set 338 if os.environ.has_key('X509_USER_PROXY'): 339 filePath = os.environ['X509_USER_PROXY'] 340 if validateProxy(filePath): 341 return filePath, filePath 342 else: 343 raise RuntimeError(rfc_proxy_msg) 344 345 # use X509_USER_CERT and X509_USER_KEY if set 346 if os.environ.has_key('X509_USER_CERT'): 347 if os.environ.has_key('X509_USER_KEY'): 348 certFile = os.environ['X509_USER_CERT'] 349 keyFile = os.environ['X509_USER_KEY'] 350 return certFile, keyFile 351 352 # search for proxy file on disk 353 uid = os.getuid() 354 path = "/tmp/x509up_u%d" % uid 355 356 if os.access(path, os.R_OK): 357 if validateProxy(path): 358 return path, path 359 else: 360 raise RuntimeError(rfc_proxy_msg) 361 362 # if we get here could not find a credential 363 raise RuntimeError(rfc_proxy_msg)
364
365 -def query_LDR(server, port, site, frameType, gpsStart, gpsEnd, urlType=None, noproxy=False):
366 """ 367 Return a list of URLs to frames covering the requested time, as returned 368 by the LDR server. 369 """ 370 try: 371 import cjson 372 except ImportError, e: 373 print >> sys.stderr, """ 374 frutils requires the cjson module. 375 376 On CentOS 5 and other RHEL-based platforms 377 this package is available from the EPEL 378 repository by doing 379 380 yum install python-cjson 381 382 For Debian Lenny this package is available by doing 383 384 apt-get install python-cjson 385 386 Mac OS X users can find this package in MacPorts. 387 388 %s 389 """ % e 390 raise 391 392 url = "/LDR/services/data/v1/gwf/%s/%s/%s,%s" % (site, frameType, gpsStart, gpsEnd) 393 # if a URL type is specified append it to the path 394 if urlType: 395 url += "/%s" % urlType 396 397 # request JSON output 398 url += ".json" 399 400 # make unauthenticated request 401 if noproxy or port == 80: 402 h = httplib.HTTPConnection(server, port) 403 else: 404 certFile, keyFile = findCredential() 405 h = httplib.HTTPSConnection(server, key_file = keyFile, cert_file = certFile) 406 407 # query the server 408 try: 409 h.request("GET", url) 410 response = h.getresponse() 411 except Exception, e: 412 msg = "Unable to query server %s: %s\n\nPerhaps you need a valid proxy credential?\n" % (server, e) 413 raise RuntimeError(msg) 414 415 # the server did respond to check the status 416 if response.status != 200: 417 msg = "Server returned code %d: %s" % (response.status, response.reason) 418 body = response.read() 419 msg += body 420 raise RuntimeError(msg) 421 422 # since status is 200 OK read the URLs 423 body = response.read() 424 425 # decode the JSON 426 return cjson.decode(body)
427
428 -class AutoqueryingFrameCache(FrameCache):
429 """ 430 This subclass of FrameCache will query ligo_data_find automatically, 431 so no LAL-cache files are required. Limitation: you'll need one instance 432 per frame type. 433 434 Constructor: 435 AutoqueryingFrameCache(frametype, hostPortString=None, scratchdir=None, 436 verbose=False) 437 438 Inputs: 439 frametype is the type of GWF frame you seek (e.g. RDS_R_L1). 440 hostPortString is the name of the LDR server and optionally, 441 with colon separation, the port (e.g. ldr.ligo.caltech.edu) 442 scratchdir determines where to locally cache frames. If None, no 443 caching is performed. 444 445 Example: 446 >>> from pylal import frutils 447 >>> d = frutils.AutoqueryingFrameCache(frametype="H1_RDS_C03_L2", scratchdir="/tmp", verbose=True) 448 >>> data = d.fetch("H1:LSC-STRAIN", 861417967, 861417969) 449 Copying /Users/nvf/temp/H-H1_RDS_C03_L2-861417967-128.gwf --> 450 /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 451 >>> print(data) 452 [ 1.68448009e-16 1.69713183e-16 1.71046196e-16 ..., 1.80974629e-16 453 1.80911765e-16 1.80804879e-16] {'dt': 6.103515625e-05, 'segments': [segment(861417967, 861417969)], 'comments': [], 'name': 'H1:LSC-STRAIN'} 454 >>> exit() 455 Removing /tmp/H-H1_RDS_C03_L2-861417967-128.gwf. 456 457 Using AutoqueryingFrameCache outside of LDG clusters, using Caltech as a 458 gateway: 459 * Just the first time you do this procedure: "sudo mkdir /data && sudo chown 460 albert.einstein /data" (replace albert.einstein with your local username; 461 /data may be different for different clusters) 462 * Set the LIGO_DATAFIND_SERVER environment variable to ldr.ligo.caltech.edu 463 (or the LDR server of the LDG cluster nearest you) 464 * Use "sshfs -o ssh_command=gsissh 465 albert.einstein@ldas-pcdev1.ligo.caltech.edu:/data /data" (replace 466 albert.einstein with your cluster username) 467 * Use "umount /data" when you're done. Unmounting cleanly will help prevent 468 headaches the next time you want to set this up. 469 """
470 - def __init__(self, frametype, hostPortString=None, scratchdir=None, 471 verbose=False):
472 FrameCache.__init__(self, None, scratchdir, verbose) 473 474 if not frametype: 475 raise ValueError("frametype required") 476 self.frametype = frametype 477 478 if hostPortString is None: 479 if os.environ.has_key('LIGO_DATAFIND_SERVER'): 480 hostPortString = os.environ['LIGO_DATAFIND_SERVER'] 481 else: 482 raise ValueError("no way to determine LIGO_DATAFIND_SERVER") 483 if hostPortString.find(':') < 0: 484 # no port specified 485 self.host = hostPortString 486 self.port = None 487 else: 488 # server and port specified 489 self.host, portString = hostPortString.split(':') 490 self.port = int(portString)
491
492 - def _query(self, channel, start, end):
493 "Do we know where the frame file is?" 494 if segment(start, end) in self._remotecoverage: 495 return True 496 urls = query_LDR(self.host, self.port, channel[0], self.frametype, start, end, urlType="file") 497 if urls: 498 new = Cache.from_urls(urls, coltype=int) 499 new.sort(key=operator.attrgetter("segment")) 500 self.add_cache(new) 501 return segment(start, end) in self._remotecoverage
502