Package glue :: Module datafind
[hide private]
[frames] | no frames]

Source Code for Module glue.datafind

  1  # -*- coding: utf-8 -*- 
  2  # Copyright (C) 2012  Scott Koranda, Duncan Macleod 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify it 
  5  # under the terms of the GNU General Public License as published by the 
  6  # Free Software Foundation; either version 2 of the License, or (at your 
  7  # option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, but 
 10  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
 12  # Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License along 
 15  # with this program; if not, write to the Free Software Foundation, Inc., 
 16  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
 17   
 18  """The client library for the LIGO Data Replicator (LDR) service. 
 19   
 20  The DataFind service allows users to query for the location of 
 21  Gravitational-Wave Frame (GWF) files containing data from the current 
 22  LIGO and Virgo gravitational-wave detectors. 
 23   
 24  This module defines the L{GWDataFindHTTPConnection} and 
 25  L{GWDataFindHTTPSConnection} class objects, for connecting to an LDR 
 26  server in open and authenticated access modes respectively. 
 27  The authenticated L{GWDataFindHTTPSConnection} connection requires users 
 28  have a valid X509 certificate that is registered with the server in 
 29  question. 
 30   
 31  A new connection can be opened as follows: 
 32   
 33  >>> from glue.datafind import GWDataFindHTTPConnection 
 34  >>> connection = GWDataFindHTTPConnection(host, port) 
 35   
 36  and similar for the HTTPS version. 
 37   
 38  Users on the LIGO Data Grid (LDG) can connect without giving the name of 
 39  the relevant host, so long as the C{LIGO_DATAFIND_SERVER} environment 
 40  variable is defined: 
 41   
 42  >>> connection = GWDataFindHTTPConnection() 
 43   
 44  Queries for frames can be made using the L{find_frame_urls<GWDataFindHTTPConnection.find_frame_urls>} method of the 
 45  relevant connection: 
 46   
 47  >>> cache = connection.find_frame_urls('L', 'L1_R', 1093564816, 1093651216) 
 48   
 49  By default, the returned L{Cache<glue.lal.Cache>} object includes both C{gsiftp} and local C{file} versions of each frame, but the C{urlfile} keyword argument can be given to return only one of those: 
 50   
 51  >>> cache = connection.find_frame_urls('L', 'L1_R', 1093564816, 1093651216, urltype='file') 
 52   
 53  See the documentation for each connection method for more detailed examples. 
 54  """ 
 55   
 56  from __future__ import division 
 57   
 58  import os 
 59  import sys 
 60  import time 
 61  import calendar 
 62  import six.moves.http_client 
 63  import re 
 64  import unittest 
 65   
 66  from OpenSSL import crypto 
 67   
 68  try: 
 69      from cjson import decode 
 70  except ImportError: 
 71      from json import loads as decode 
 72   
 73  from glue import (lal, git_version, segments) 
 74   
 75  __author__ = "Duncan Macleod <duncan.macleod@ligo.org>" 
 76  __credits__ = "Scott Koranda <scott.koranda@ligo.org>" 
 77  __version__ = git_version.id 
 78  __date__    = git_version.date 
 79   
 80  _server_env = "LIGO_DATAFIND_SERVER" 
 81  _url_prefix = "/LDR/services/data/v1" 
 82   
 83   
84 -class GWDataFindHTTPConnection(six.moves.http_client.HTTPConnection):
85 """Connection to LIGO data replicator service using HTTP. 86 87 @param host: the name of the server with which to connect 88 @param port: the port on which to connect 89 @param **kwargs: other keyword arguments accepted by 90 L{httplib.HTTPConnection} 91 92 @type host: L{str} 93 @type port: L{int} 94 """ 95 LIGOTimeGPSType = lal.LIGOTimeGPS
96 - def __init__(self, host=None, **kwargs):
97 """Connect to the LDR host using HTTPS. Default host is 98 defined by the %s environment variable. 99 """ 100 if not host: 101 host,port = find_server() 102 kwargs.setdefault("port", port) 103 six.moves.http_client.HTTPConnection.__init__(self, host, **kwargs)
104 __init__.__doc__ %= _server_env 105
106 - def _requestresponse(self, method, url, body=None, headers={}):
107 """Internal method to perform request and verify reponse. 108 109 @param method: name of the method to use (e.g. 'GET') 110 @param url : remote URL to query 111 112 @type method: L{str} 113 @type url : L{str} 114 115 @returns: L{str} response from server query 116 117 @raises RuntimeError: if query is unsuccessful 118 """ 119 try: 120 self.request(method, url) 121 response = self.getresponse() 122 except Exception as e: 123 raise RuntimeError("Unable to query server %s: %s\n\n" 124 "Perhaps you need a valid proxy credential?\n" 125 % (self.host, e)) 126 if response.status != 200: 127 raise RuntimeError("Server returned code %d: %s%s" 128 % (response.status, response.reason, 129 response.read())) 130 return response
131
132 - def ping(self):
133 """Ping the LDR host to test for life 134 135 @raises RuntimeError: when ping fails 136 @returns: 0 if ping was successful 137 """ 138 url = "%s/gwf/%s/%s/%s,%s" % (_url_prefix, 'H', 'R', '1', '2') 139 self._requestresponse("HEAD", url) 140 return 0
141
142 - def find_observatories(self, match=None):
143 """Query the LDR host for observatories. Use match to 144 restrict returned observatories to those matching the 145 regular expression. 146 147 Example: 148 149 >>> connection.find_observatories() 150 ['AGHLT', 'G', 'GHLTV', 'GHLV', 'GHT', 'H', 'HL', 'HLT', 151 'L', 'T', 'V', 'Z'] 152 >>> connection.find_observatories("H") 153 ['H', 'HL', 'HLT'] 154 155 @type match: L{str} 156 @param match: 157 name to match return observatories against 158 159 @returns: L{list} of observatory prefixes 160 """ 161 url = "%s/gwf.json" % _url_prefix 162 response = self._requestresponse("GET", url) 163 sitelist = sorted(set(decode(response.read()))) 164 if match: 165 regmatch = re.compile(match) 166 sitelist = [site for site in sitelist if regmatch.search(site)] 167 return sitelist
168
169 - def find_types(self, site=None, match=None):
170 """Query the LDR host for frame types. Use site to restrict 171 query to given observatory prefix, and use match to restrict 172 returned types to those matching the regular expression. 173 174 Example: 175 176 >>> connection.find_types("L", "RDS") 177 ['L1_RDS_C01_LX', 178 'L1_RDS_C02_LX', 179 'L1_RDS_C03_L2', 180 'L1_RDS_R_L1', 181 'L1_RDS_R_L3', 182 'L1_RDS_R_L4', 183 'PEM_RDS_A6', 184 'RDS_R_L1', 185 'RDS_R_L2', 186 'RDS_R_L3', 187 'TESTPEM_RDS_A6'] 188 189 @param site: single-character name of site to match 190 @param match: type-name to match against 191 192 @type site: L{str} 193 @type match: L{str} 194 195 @returns: L{list} of frame types 196 """ 197 if site: 198 url = "%s/gwf/%s.json" % (_url_prefix, site[0]) 199 else: 200 url = "%s/gwf/all.json" % _url_prefix 201 response = self._requestresponse("GET", url) 202 typelist = sorted(set(decode(response.read()))) 203 if match: 204 regmatch = re.compile(match) 205 typelist = [type for type in typelist if regmatch.search(type)] 206 return typelist
207
208 - def find_times(self, site, frametype, gpsstart=None, gpsend=None):
209 """Query the LDR for times for which frames are avaliable 210 211 Use gpsstart and gpsend to restrict the returned times to 212 this semiopen interval. 213 214 @returns: L{segmentlist<glue.segments.segmentlist>} 215 216 @param site: 217 single-character name of site to match 218 @param frametype: 219 name of frametype to match 220 @param gpsstart: 221 integer GPS start time of query 222 @param gpsend: 223 integer GPS end time of query 224 225 @type site: L{str} 226 @type frametype: L{str} 227 @type gpsstart: L{int} 228 @type gpsend: L{int} 229 """ 230 if gpsstart and gpsend: 231 url = ("%s/gwf/%s/%s/segments/%d,%d.json" 232 % (_url_prefix, site, frametype, gpsstart, gpsend)) 233 else: 234 url = ("%s/gwf/%s/%s/segments.json" 235 % (_url_prefix, site, frametype)) 236 237 response = self._requestresponse("GET", url) 238 segmentlist = decode(response.read()) 239 return segments.segmentlist(map(segments.segment, segmentlist))
240
241 - def find_frame(self, framefile, urltype=None, on_missing="warn"):
242 """Query the LDR host for a single framefile 243 244 @returns: L{Cache<glue.lal.Cache>} 245 246 @param frametype: 247 name of frametype to match 248 @param urltype: 249 file scheme to search for (e.g. 'file') 250 @param on_missing: 251 what to do when the requested frame isn't found, one of: 252 - C{'warn'} (default): print a warning, 253 - C{'error'}: raise an L{RuntimeError}, or 254 - C{'ignore'}: do nothing 255 256 @type frametype: L{str} 257 @type urltype: L{str} 258 @type on_missing: L{str} 259 260 @raises RuntimeError: if given framefile is malformed 261 """ 262 if on_missing not in ("warn", "error", "ignore"): 263 raise ValueError("on_missing must be 'warn', 'error', or 'ignore'.") 264 framefile = os.path.basename(framefile) 265 # parse file name for site, frame type 266 try: 267 site,frametype,_,_ = framefile.split("-") 268 except Exception as e: 269 raise RuntimeError("Error parsing filename %s: %s" % (framefile, e)) 270 url = ("%s/gwf/%s/%s/%s.json" 271 % (_url_prefix, site, frametype, framefile)) 272 response = self._requestresponse("GET", url) 273 urllist = decode(response.read()) 274 if len(urllist) == 0: 275 if on_missing == "warn": 276 sys.stderr.write("No files found!\n") 277 elif on_missing == "error": 278 raise RuntimeError("No files found!") 279 # verify urltype is what we want 280 cache = lal.Cache(e for e in 281 [lal.CacheEntry.from_T050017(x, coltype=self.LIGOTimeGPSType) 282 for x in urllist] if not urltype or e.scheme == urltype) 283 return cache
284
285 - def find_latest(self, site, frametype, urltype=None, on_missing="warn"):
286 """Query for the most recent framefile of a given type. 287 288 @param site: 289 single-character name of site to match 290 @param frametype: 291 name of frametype to match 292 @param urltype: 293 file scheme to search for (e.g. 'file') 294 @param on_missing: 295 what to do when the requested frame isn't found, one of: 296 - C{'warn'} (default): print a warning, 297 - C{'error'}: raise an L{RuntimeError}, or 298 - C{'ignore'}: do nothing 299 300 @type site: L{str} 301 @type frametype: L{str} 302 @type urltype: L{str} 303 @type on_missing: L{str} 304 305 @returns: L{Cache<glue.lal.Cache>} with one 306 L{entry<glue.lal.CacheEntry>} 307 308 @raises RuntimeError: if given framefile is malformed 309 @raises RuntimeError: if no frames are found and C{on_missing='error'} 310 """ 311 if on_missing not in ('warn', 'error', 'ignore'): 312 raise ValueError("on_missing must be 'warn', 'error', or 'ignore'.") 313 url = "%s/gwf/%s/%s/latest" % (_url_prefix, site, frametype) 314 # if a URL type is specified append it to the path 315 if urltype: 316 url += "/%s" % urltype 317 # request JSON output 318 url += ".json" 319 response = self._requestresponse("GET", url) 320 urllist = decode(response.read()) 321 if len(urllist) == 0: 322 if on_missing == "warn": 323 sys.stderr.write("No files found!\n") 324 elif on_missing == "error": 325 raise RuntimeError("No files found!") 326 return lal.Cache([lal.CacheEntry.from_T050017(x, 327 coltype=self.LIGOTimeGPSType) for x in urllist])
328
329 - def find_frame_urls(self, site, frametype, gpsstart, gpsend, 330 match=None, urltype=None, on_gaps="warn"):
331 """Find the framefiles for the given type in the [start, end) interval 332 frame 333 334 @param site: 335 single-character name of site to match 336 @param frametype: 337 name of frametype to match 338 @param gpsstart: 339 integer GPS start time of query 340 @param gpsend: 341 integer GPS end time of query 342 @param match: 343 regular expression to match against 344 @param urltype: 345 file scheme to search for (e.g. 'file') 346 @param on_gaps: 347 what to do when the requested frame isn't found, one of: 348 - C{'warn'} (default): print a warning, 349 - C{'error'}: raise an L{RuntimeError}, or 350 - C{'ignore'}: do nothing 351 352 @type site: L{str} 353 @type frametype: L{str} 354 @type gpsstart: L{int} 355 @type gpsend: L{int} 356 @type match: L{str} 357 @type urltype: L{str} 358 @type on_gaps: L{str} 359 360 @returns: L{Cache<glue.lal.Cache>} 361 362 @raises RuntimeError: if gaps are found and C{on_gaps='error'} 363 """ 364 if on_gaps not in ("warn", "error", "ignore"): 365 raise ValueError("on_gaps must be 'warn', 'error', or 'ignore'.") 366 url = ("%s/gwf/%s/%s/%s,%s" 367 % (_url_prefix, site, frametype, gpsstart, gpsend)) 368 # if a URL type is specified append it to the path 369 if urltype: 370 url += "/%s" % urltype 371 # request JSON output 372 url += ".json" 373 # append a regex if input 374 if match: 375 url += "?match=%s" % match 376 # make query 377 response = self._requestresponse("GET", url) 378 urllist = decode(response.read()) 379 380 out = lal.Cache([lal.CacheEntry.from_T050017(x, 381 coltype=self.LIGOTimeGPSType) for x in urllist]) 382 383 if on_gaps == "ignore": 384 return out 385 else: 386 span = segments.segment(gpsstart, gpsend) 387 seglist = segments.segmentlist(e.segment for e in out).coalesce() 388 missing = (segments.segmentlist([span]) - seglist).coalesce() 389 if span in seglist: 390 return out 391 else: 392 msg = "Missing segments: \n%s" % "\n".join(map(str, missing)) 393 if on_gaps=="warn": 394 sys.stderr.write("%s\n" % msg) 395 return out 396 else: 397 raise RuntimeError(msg)
398
399 -class GWDataFindHTTPSConnection(six.moves.http_client.HTTPSConnection, GWDataFindHTTPConnection):
400 """Secured connection to LIGO data replicator service using HTTPS. 401 """
402 - def __init__(self, host=None, **kwargs):
403 """Connect to the LDR host using HTTPS. 404 405 Default host is defined by the %s environment variable. 406 """ 407 if not host: 408 host, port = find_server() 409 kwargs.setdefault("port", port) 410 six.moves.http_client.HTTPSConnection.__init__(self, host, **kwargs)
411 __init__.__doc__ %= _server_env
412 413
414 -def validate_proxy(path):
415 """Validate the users X509 proxy certificate 416 417 Tests that the proxy certificate is RFC 3820 compliant and that it 418 is valid for at least the next 15 minutes. 419 420 @returns: L{True} if the certificate validates 421 @raises RuntimeError: if the certificate cannot be validated 422 """ 423 # load the proxy from path 424 try: 425 with open(path, 'rt') as f: 426 cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) 427 except IOError as e: 428 e.args = ('Failed to load proxy certificate: %s' % str(e),) 429 raise 430 431 # try and read proxyCertInfo 432 rfc3820 = False 433 for i in range(cert.get_extension_count()): 434 if cert.get_extension(i).get_short_name() == 'proxyCertInfo': 435 rfc3820 = True 436 break 437 438 # otherwise test common name 439 if not rfc3820: 440 subject = cert.get_subject() 441 if subject.CN.startswith('proxy'): 442 raise RuntimeError('Could not find a valid proxy credential') 443 444 # check time remaining 445 expiry = cert.get_notAfter() 446 if isinstance(expiry, bytes): 447 expiry = expiry.decode('utf-8') 448 expiryu = calendar.timegm(time.strptime(expiry, "%Y%m%d%H%M%SZ")) 449 if expiryu < time.time(): 450 raise RuntimeError('Required proxy credential has expired') 451 452 # return True to indicate validated proxy 453 return True
454
455 -def find_credential():
456 """Locate the users X509 certificate and key files 457 458 This method uses the C{X509_USER_CERT} and C{X509_USER_KEY} to locate 459 valid proxy information. If those are not found, the standard location 460 in /tmp/ is searched. 461 462 @raises RuntimeError: if the proxy found via either method cannot 463 be validated 464 @raises RuntimeError: if the cert and key files cannot be located 465 """ 466 467 rfc_proxy_msg = ("Could not find a RFC 3820 compliant proxy credential." 468 "Please run 'grid-proxy-init -rfc' and try again.") 469 470 # use X509_USER_PROXY from environment if set 471 if 'X509_USER_PROXY' in os.environ: 472 filePath = os.environ['X509_USER_PROXY'] 473 if validate_proxy(filePath): 474 return filePath, filePath 475 else: 476 raise RuntimeError(rfc_proxy_msg) 477 478 # use X509_USER_CERT and X509_USER_KEY if set 479 if ('X509_USER_CERT' in os.environ and 480 'X509_USER_KEY' in os.environ): 481 certFile = os.environ['X509_USER_CERT'] 482 keyFile = os.environ['X509_USER_KEY'] 483 return certFile, keyFile 484 485 # search for proxy file on disk 486 uid = os.getuid() 487 path = "/tmp/x509up_u%d" % uid 488 489 if os.access(path, os.R_OK): 490 if validate_proxy(path): 491 return path, path 492 else: 493 raise RuntimeError(rfc_proxy_msg) 494 495 # if we get here could not find a credential 496 raise RuntimeError(rfc_proxy_msg)
497
498 -def find_server():
499 """Find the default server host from the environment 500 501 This method uses the C{LIGO_DATAFIND_SERVER} variable to construct 502 a C{(host, port)} tuple. 503 504 @returns: C{(host, port)}: the L{str} host name and L{int} port number 505 506 @raises RuntimeError: if the C{LIGO_DATAFIND_SERVER} environment variable 507 is not set 508 """ 509 510 if _server_env in os.environ: 511 host = os.environ[_server_env] 512 port = None 513 if re.search(':', host): 514 host, port = host.split(':', 1) 515 if port: 516 port = int(port) 517 return host, port 518 else: 519 raise RuntimeError("Environment variable %s is not set" % _server_env)
520 521
522 -class TestLDR(unittest.TestCase):
523 """Small suite of test functions. 524 525 Probably won't work if you're not on an LDAS 526 machine... 527 """
528 - def test_HTTPConnection(self):
529 h = GWDataFindHTTPConnection() 530 h.close()
531
532 - def test_HTTPSConnection(self):
533 h = GWDataFindHTTPSConnection() 534 h.close()
535
536 - def test_ping(self):
537 h = GWDataFindHTTPConnection() 538 h.ping() 539 h.close()
540
541 - def test_latest(self):
542 h = GWDataFindHTTPConnection() 543 h.find_latest("L", "R") 544 h.close()
545
546 - def test_find_observatories(self):
550
551 - def test_find_times(self):
552 h = GWDataFindHTTPConnection() 553 h.find_times("L", "R") 554 h.close()
555
556 - def test_find_frame_urls(self):
557 h = GWDataFindHTTPConnection() 558 h.find_frame_urls("L", "R", 1000000000, 1000001000, on_gaps="ignore") 559 h.close()
560 561 if __name__ == "__main__": 562 unittest.main() 563