Package glue :: Module ldbd
[hide private]
[frames] | no frames]

Source Code for Module glue.ldbd

  1  """ 
  2  lightweight database dumper 
  3  Copyright (C) 2003 Duncan Brown 
  4  This file is part of the lightweight datapase dumper (ldbd) 
  5   
  6  The ldbd module provides classes for manipulating LIGO metadata database 
  7  tables. 
  8   
  9  References: 
 10  http://www.ligo.caltech.edu/docs/T/T990101-02.pdf 
 11  http://www.ligo.caltech.edu/docs/T/T990023-01.pdf 
 12  http://ldas-sw.ligo.caltech.edu/doc/db2/doc/html/ilwdformat.html 
 13   
 14  This file is part of the Grid LSC User Environment (GLUE) 
 15   
 16  GLUE is free software: you can redistribute it and/or modify it under the 
 17  terms of the GNU General Public License as published by the Free Software 
 18  Foundation, either version 3 of the License, or (at your option) any later 
 19  version. 
 20   
 21  This program is distributed in the hope that it will be useful, but WITHOUT 
 22  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 23  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
 24  details. 
 25   
 26  You should have received a copy of the GNU General Public License along with 
 27  this program.  If not, see <http://www.gnu.org/licenses/>. 
 28  """ 
 29  from glue import git_version 
 30  __author__ = 'Duncan Brown <dbrown@ligo.caltech.edu>' 
 31  __date__ = git_version.date  
 32  __version__ = git_version.id 
 33   
 34  import os 
 35  import sys 
 36  import string 
 37  import re 
 38  import csv 
 39  try: 
 40    import DB2 
 41  except: 
 42    pass 
 43   
 44  try:  # python < 3 
 45      long 
 46  except NameError:  # python >= 3 
 47      long = int 
 48   
 49  from glue.ligolw.types import string_format_func 
 50   
 51  """ 
 52  create the csv parser and initialize a dialect for LIGO_LW streams 
 53  """ 
 54   
55 -class LIGOLWStream(csv.Dialect):
56 """ 57 Create a csv parser dialect for parsing LIGO_LW streams 58 """ 59 delimiter = ',' 60 doublequote = False 61 escapechar = '\\' 62 lineterminator = '\n' 63 quotechar = '"' 64 quoting = csv.QUOTE_ALL 65 skipinitialspace = True
66 67 csv.register_dialect("LIGOLWStream",LIGOLWStream) 68 69
70 -class LIGOLwParseError(Exception):
71 """Error parsing LIGO lightweight XML file""" 72 pass
73 74
75 -class LIGOLwDBError(Exception):
76 """Error interacting with database""" 77 pass
78 79
80 -class Xlator(dict):
81 """ 82 All in one multiple string substitution class from the python cookbook 83 """
84 - def _make_regex(self):
85 """ 86 Build a re object based on keys in the current dictionary 87 """ 88 return re.compile("|".join(map(re.escape, self.keys())))
89
90 - def __call__(self, match):
91 """ 92 Handler invoked for each regex match 93 """ 94 return self[match.group(0)]
95
96 - def xlat(self, text):
97 """ 98 Translate text, returns the modified text 99 """ 100 return self._make_regex().sub(self,text)
101 102
103 -class LIGOMetadataDatabase:
104 """ 105 Contains a tuple of tables in the order that insertions should 106 ocour and a dictionary of mandatory uniquw id fields for each 107 table that must be populated. 108 """
109 - def __init__(self,database):
110 """ 111 database = the name of the LIGO database to initalize 112 """ 113 self.database = database 114 self.uniqueids = {} 115 conn = DB2.connect(dsn=database, uid='', pwd='') 116 curs = conn.cursor() 117 curs.execute("SELECT tabname FROM syscat.tables WHERE definer<>'SYSIBM' " 118 "AND TYPE='T' ORDER BY PARENTS ASC") 119 self.tables = curs.fetchall() 120 curs.execute("SELECT tabname, colname FROM syscat.columns " + 121 "WHERE typename = 'CHARACTER' AND length = 13 AND nulls = 'N'") 122 for tab, col in curs.fetchall(): 123 tab = tab.lower() 124 col = col.lower() 125 try: 126 self.uniqueids[tab][col] = 'ilwd:char' 127 except KeyError: 128 self.uniqueids[tab] = {} 129 self.uniqueids[tab][col] = 'ilwd:char' 130 curs.close() 131 conn.close()
132 133
134 -class UniqueIds:
135 """ 136 Contains a dictionary of unique ids which can be queried based 137 on name. If a unique id does not exist in the dictionaty, one 138 is fetched from the database. 139 """
140 - def __init__(self,curs):
141 """ 142 curs = database cursor to the currently open database 143 """ 144 self.uqids = {} 145 self.curs = curs
146
147 - def lookup(self,istring):
148 """ 149 istring = the ilwd:char string corresponding to a unique id 150 """ 151 try: 152 return self.uqids[istring] 153 except KeyError: 154 curs = self.curs 155 curs.execute('VALUES BLOB(GENERATE_UNIQUE())') 156 self.uqids[istring] = curs.fetchone()[0] 157 return self.uqids[istring]
158 159
160 -class LIGOLwParser:
161 """ 162 Provides methods for parsing the data from a LIGO lightweight XML 163 file parsed with pyRXP into a dictionary 164 """ 165
166 - def __init__(self):
167 """ 168 Initializes a LIGO lightweight XML parser with the necessary 169 regular expressions and function for tuple translation 170 """ 171 self.tabrx = re.compile(r'(\A[a-z0-9_]+:|\A)([a-z0-9_]+):table\Z') 172 self.colrx = re.compile(r'(\A[a-z0-9_]+:|\A)([a-z0-9_]+:|\A)([a-z0-9_]+)\Z') 173 self.llsrx = re.compile(r'\A\s*"') 174 self.rlsrx = re.compile(r'"\s*\Z') 175 self.licrx = re.compile(r'\A\s+"') 176 self.ricrx = re.compile(r'"*\s*\Z') 177 self.octrx = re.compile(r'\A\\[0-9][0-9][0-9]') 178 self.dlmrx = re.compile(r'\\,') 179 self.unique = None 180 self.types = { 181 'int_2s' : int, 182 'int_4s' : int, 183 'int_8s' : long, 184 'real_4' : float, 185 'real_8' : float, 186 'lstring' : self.__lstring, 187 'ilwd:char' : self.__ilwdchar, 188 'ilwd:char_u' : self.__ilwdchar 189 } 190 self.xmltostr = Xlator({ r'&amp;' : r'&', r'&gt;' : r'>', r'&lt;' : r'<','\\\\' : '\\'}) # Note: see https://www.gravity.phy.syr.edu/dokuwiki/doku.php?id=rpfisher:gluebughunt if this is confusing, the parser just cleanly handles the conversion of everything
191
192 - def __del__(self):
193 if self.unique: 194 del self.unique
195
196 - def __lstring(self,lstr):
197 """ 198 Returns a parsed lstring by stripping out and instances of 199 the escaped delimiter. Sometimes the raw lstring has whitespace 200 and a double quote at the beginning or end. If present, these 201 are removed. 202 """ 203 lstr = self.llsrx.sub('',lstr.encode('ascii')) 204 lstr = self.rlsrx.sub('',lstr) 205 lstr = self.xmltostr.xlat(lstr) 206 lstr = self.dlmrx.sub(',',lstr) 207 return lstr
208
209 - def __ilwdchar(self,istr):
210 """ 211 If the ilwd:char field contains octal data, it is translated 212 to a binary string and returned. Otherwise a lookup is done 213 in the unique id dictionary and a binary string containing the 214 correct unique id is returned. 215 """ 216 istr_orig = istr 217 istr = self.licrx.sub('',istr.encode('ascii')) 218 istr = self.ricrx.sub('',istr) 219 if self.octrx.match(istr): 220 exec("istr = '"+istr+"'") 221 # if the DB2 module is loaded, the string should be converted 222 # to an instance of the DB2.Binary class. If not, leave it as 223 # a string containing binary data. 224 try: 225 istr = DB2.Binary(istr) 226 except: 227 pass 228 else: 229 try: 230 istr = self.unique.lookup(istr) 231 except AttributeError: 232 if not self.unique: 233 istr = istr_orig 234 else: 235 raise LIGOLwParseError('unique id table has not been initialized') 236 return istr
237
238 - def parsetuple(self,xmltuple):
239 """ 240 Parse an XML tuple returned by pyRXP into a dictionary 241 of LIGO metadata elements. The dictionary contains one 242 entry for each table found in the XML tuple. 243 """ 244 # first extract all the table and columns from the tuple from the 245 # children of the ligo lightweight parent tuple 246 table = {} 247 tupleidx = 0 248 for tag in xmltuple[2]: 249 if tag[0] == 'Table' or tag[0] == 'table': 250 tab = tag[1]['Name'].encode('ascii').lower() 251 try: 252 tab = self.tabrx.match(tab).group(2) 253 except AttributeError: 254 raise LIGOLwParseError('unable to parse a valid table name '+tab) 255 # initalize the table dictionary for this table 256 table[tab] = { 257 'pos' : tupleidx, 258 'column' : {}, 259 'stream' : (), 260 'query' : '' 261 } 262 # parse for columns in the tables children 263 # look for the column name and type in the attributes 264 # store the index in which the columns were found as 265 # we need this to decode the stream later 266 for subtag in tag[2]: 267 if subtag[0] == 'Column' or subtag[0] == 'column': 268 col = subtag[1]['Name'].encode('ascii').lower() 269 try: 270 col = self.colrx.match(col).group(3) 271 except AttributeError: 272 raise LIGOLwParseError('unable to parse a valid column name '+col) 273 try: 274 typ = subtag[1]['Type'].encode('ascii').lower() 275 except KeyError: 276 raise LIGOLwParseError('type is missing for column '+col) 277 table[tab]['column'][col] = typ 278 table[tab].setdefault('orderedcol',[]).append(col) 279 tupleidx += 1 280 281 # now iterate the dictionary of tables we have created looking for streams 282 for tab in table.keys(): 283 for tag in xmltuple[2][table[tab]['pos']][2]: 284 if tag[0] == 'Stream' or tag[0] == 'stream': 285 # store the stream delimiter and create the esacpe regex 286 try: 287 delim = tag[1]['Delimiter'].encode('ascii') 288 except KeyError: 289 raise LIGOLwParseError('stream is missing delimiter') 290 if delim != ',': 291 raise LIGOLwParseError('unable to handle stream delimiter: '+delim) 292 293 # If the result set is empty tag[2] is an empty array, which causes 294 # the next step to fail. Add an empty string in this case. 295 if len(tag[2]) == 0: 296 tag[2].append("") 297 298 299 # strip newlines from the stream and parse it 300 stream = next(csv.reader([re.sub(r'\n','',tag[2][0])],LIGOLWStream)) 301 302 # turn the csv stream into a list of lists 303 slen = len(stream) 304 ntyp = len(table[tab]['column']) 305 mlen, lft = divmod(slen,ntyp) 306 if lft != 0: 307 raise LIGOLwParseError('invalid stream length for given columns') 308 lst = [[None] * ntyp for i in range(mlen)] 309 310 # translate the stream data to the correct data types 311 for i in range(slen): 312 j, k = divmod(i,ntyp) 313 try: 314 thiscol = table[tab]['orderedcol'][k] 315 if len( stream[i] ) == 0: 316 lst[j][k] = None 317 else: 318 lst[j][k] = self.types[table[tab]['column'][thiscol]](stream[i]) 319 except (KeyError, ValueError) as errmsg: 320 msg = "stream translation error (%s) " % str(errmsg) 321 msg += "for column %s in table %s: %s -> %s" \ 322 % (tab,thiscol,stream[i],str(table[tab])) 323 raise LIGOLwParseError(msg) 324 table[tab]['stream'] = list(map(tuple,lst)) 325 326 # return the created table to the caller 327 return table
328 329
330 -class LIGOMetadata:
331 """ 332 LIGO Metadata object class. Contains methods for parsing a LIGO 333 lightweight XML file and inserting it into a database, executing 334 and SQL query to retrive data from the database and writing it 335 to a LIGO lightweight XML file 336 """
337 - def __init__(self,xmlparser=None,lwtparser=None,ldb=None):
338 """ 339 Connects to the database and creates a cursor. Initializes the unique 340 id table for this LIGO lw document. 341 342 ldb = LIGOMetadataDatabase object 343 xmlparser = pyRXP XML to tuple parser object 344 lwtparser = LIGOLwParser object (tuple parser) 345 """ 346 self.ldb = ldb 347 if self.ldb: 348 self.dbcon = DB2.connect(dsn=self.ldb.database, uid='', pwd='') 349 self.curs = self.dbcon.cursor() 350 else: 351 self.dbcon = None 352 self.curs = None 353 self.xmlparser = xmlparser 354 self.lwtparser = lwtparser 355 if lwtparser: 356 self.lwtparser.unique = None 357 self.table = {} 358 self.strtoxml = Xlator({ r'&' : r'&amp;', r'>' : r'&gt;', r'<' : r'&lt;', '\\' : '\\\\', '\"' : '\\\"' }) # Note: see https://www.gravity.phy.syr.edu/dokuwiki/doku.php?id=rpfisher:gluebughunt if this is confusing, the parser just cleanly handles the conversion of everything
359
360 - def __del__(self):
361 if self.curs: 362 self.curs.close() 363 if self.dbcon: 364 self.dbcon.close()
365
366 - def reset(self):
367 """Clear any existing table""" 368 if self.table: 369 del self.table 370 self.table = {}
371
372 - def parse(self,xml):
373 """ 374 Parses an XML document into a form read for insertion into the database 375 376 xml = the xml document to be parsed 377 """ 378 if not self.xmlparser: 379 raise LIGOLwParseError("pyRXP parser not initialized") 380 if not self.lwtparser: 381 raise LIGOLwParseError("LIGO_LW tuple parser not initialized") 382 xml = "".join([x.strip() for x in xml.split('\n')]) 383 ligolwtup = self.xmlparser(xml) 384 if self.curs: 385 self.lwtparser.unique = UniqueIds(self.curs) 386 self.table = self.lwtparser.parsetuple(ligolwtup)
387
388 - def add_lfn(self,lfn):
389 """ 390 Add an LFN table to a parsed LIGO_LW XML document. 391 392 lfn = lfn to be added 393 """ 394 if len(self.table['process']['stream']) > 1: 395 msg = "cannot add lfn to table with more than one process" 396 raise LIGOLwParseError(msg) 397 # get the process_id from the process table 398 pid_col = self.table['process']['orderedcol'].index('process_id') 399 pid = self.table['process']['stream'][0][pid_col] 400 try: 401 self.table['lfn']['stream'].append((pid,lfn)) 402 except KeyError: 403 self.table['lfn'] = { 404 'pos' : 0, 405 'column' : {'process_id' : 'ilwd:char', 'name' : 'lstring'}, 406 'stream' : [(pid, lfn)], 407 'query' : '', 408 'orderedcol' : ['process_id', 'name' ] 409 }
410
411 - def set_dn(self,dn):
412 """ 413 Use the domain column in the process table to store the DN 414 415 dn = dn to be added 416 """ 417 try: 418 domain_col = self.table['process']['orderedcol'].index('domain') 419 for row_idx in range(len(self.table['process']['stream'])): 420 row_list = list(self.table['process']['stream'][row_idx]) 421 row_list[domain_col] = dn 422 self.table['process']['stream'][row_idx] = tuple(row_list) 423 except ValueError: 424 self.table['process']['column']['domain'] = 'lstring' 425 self.table['process']['orderedcol'].append('domain') 426 for row_idx in range(len(self.table['process']['stream'])): 427 row_list = list(self.table['process']['stream'][row_idx]) 428 row_list.append(dn) 429 self.table['process']['stream'][row_idx] = tuple(row_list)
430
431 - def insert(self):
432 """Insert the object into the database""" 433 if not self.curs: 434 raise LIGOLwDBError("Database connection not initalized") 435 if len(self.table) == 0: 436 raise LIGOLwDBError('attempt to insert empty table') 437 for tab in self.table.keys(): 438 # find and add any missing unique ids 439 generate = [] 440 missingcols = [k for k in self.ldb.uniqueids[tab] 441 if k not in self.table[tab]['column']] 442 for m in missingcols: 443 generate.append(',BLOB(GENERATE_UNIQUE())') 444 self.table[tab]['orderedcol'].append(m) 445 # and construct the sql query 446 self.table[tab]['query'] = ' '.join( 447 ['INSERT INTO', tab, '(', ','.join(self.table[tab]['orderedcol']), 448 ') VALUES (', ','.join(['?' for x in self.table[tab]['column']]) , 449 ''.join(generate), ')']) 450 for tabtup in self.ldb.tables: 451 tab = tabtup[0].lower() 452 try: 453 try: 454 self.curs.executemany(self.table[tab]['query'], 455 self.table[tab]['stream']) 456 rowcount = self.curs.rowcount 457 except DB2.Error as e: 458 self.curs.execute('rollback') 459 msg = e[2] 460 msg += self.xml() + '\n' 461 msg += str(self.table[tab]['query']) + '\n' 462 msg += str(self.table[tab]['stream']) + '\n' 463 raise LIGOLwDBError(msg) 464 except DB2.Warning as e: 465 self.curs.execute('rollback') 466 raise LIGOLwDBError(e[2]) 467 #except Exception, e: 468 # self.curs.execute('rollback') 469 # raise LIGOLwDBError, e[2] 470 except KeyError: 471 pass 472 self.curs.execute('commit') 473 return rowcount
474 475
476 - def select(self,sql):
477 """ 478 Execute an SQL select statement and stuff the results into a 479 dictionary. 480 481 sql = the (case sensitve) SQL statment to execute 482 """ 483 if not self.curs: 484 raise LIGOLwDBError("Database connection not initalized") 485 if len(self.table) != 0: 486 raise LIGOLwDBError('attempt to fill non-empty table from database') 487 ligolw = '' 488 self.table = {} 489 sqltypes = { 490 -2 : 'ilwd:char_u', 491 1 : 'lstring', 492 3 : 'real_8', 493 4 : 'int_4s', 494 5 : 'int_2s', 495 7 : 'real_4', 496 8 : 'real_8', 497 12 : 'lstring', 498 93 : 'lstring', 499 } 500 try: 501 tab = re.compile(r'[Ff][Rr][Oo][Mm]\s+([A-Za-z0-0_]+)([,\s]+|$)').search(sql).group(1) 502 except AttributeError: 503 raise LIGOLwDBError('could not find table name in query ' + str(sql)) 504 self.table[tab] = { 505 'pos' : 0, 506 'column' : {}, 507 'stream' : (), 508 'query' : sql 509 } 510 try: 511 self.curs.execute(sql) 512 except DB2.Error as e: 513 raise LIGOLwDBError(e[2]) 514 desc = self.curs.description 515 for col,typ,disp,intsz,prec,sca,nul in desc: 516 try: 517 self.table[tab]['column'][col] = sqltypes[typ] 518 except KeyError: 519 raise LIGOLwDBError('unknown type returned by database ' + str(typ)) 520 self.table[tab].setdefault('orderedcol',[]).append(col) 521 522 try: 523 self.table[tab]['stream'] = self.curs.fetchall() 524 except DB2.Error as e: 525 raise LIGOLwDBError(e[2]) 526 527 return len(self.table[tab]['stream'])
528
529 - def xml(self, ilwdchar_to_hex = True):
530 """Convert a table dictionary to LIGO lightweight XML""" 531 if len(self.table) == 0: 532 raise LIGOLwDBError('attempt to convert empty table to xml') 533 ligolw = """\ 534 <?xml version='1.0' encoding='utf-8' ?> 535 <?xml-stylesheet type="text/xsl" href="ligolw.xsl"?> 536 <!DOCTYPE LIGO_LW SYSTEM "http://ldas-sw.ligo.caltech.edu/doc/ligolwAPI/html/ligolw_dtd.txt"> 537 <LIGO_LW> 538 """ 539 540 for tab in self.table.keys(): 541 try: 542 ligolw += ' <Comment>'+self.strtoxml.xlat(self.table[tab]['query'])+'</Comment>\n' 543 except KeyError: 544 pass 545 ligolw += ' <Table Name="'+tab+':table">\n' 546 for col in self.table[tab]['orderedcol']: 547 ligolw +=' <Column Name="'+tab.lower()+':'+col.lower()+'" Type="'+self.table[tab]['column'][col].lower()+'"/>\n' 548 ligolw += ' <Stream Name="'+tab.lower()+':table" Type="Local" Delimiter=",">\n' 549 stridx = 0 550 ligolw += ' ' 551 for tup in self.table[tab]['stream']: 552 if stridx != 0: 553 ligolw += ',\n ' 554 colidx = 0 555 for tupi in tup: 556 if tupi is not None: 557 coltype = self.table[tab]['column'][self.table[tab]['orderedcol'][colidx]] 558 if re.match(r'\Ailwd:char_u\Z',coltype): 559 ligolw += '"' 560 for ch in str(tupi): 561 # NOTE: escape the backslash in the ilwd:char_u octal string 562 ligolw += '\\\\%.3o' % (ord(ch)) 563 ligolw += '"' 564 elif re.match(r'\Ailwd:char\Z',coltype): 565 if ilwdchar_to_hex is True: 566 # encode in DB2-style hex (e.g., "x'deadbeef'") 567 ligolw += '"x\'' 568 for ch in str(tupi): 569 ligolw += "%02x" % ord(ch) 570 ligolw += '\'"' 571 else: 572 ligolw += '"' + str(tupi) + '"' 573 elif re.match(r'\Alstring\Z',coltype): 574 # this santizes the contents of tupi in several ways: 575 # strtoxml.xlat escapes any double-quote and 576 # backslash chars (with a preceding blackslash); and 577 # then replaces <>& chars with their html 578 # code equivalents 579 # NOTE: string_format_func was removed so the enclosing "" 580 # chars need to be added ourselves 581 ligolw += '"'+self.strtoxml.xlat(tupi)+'"' 582 elif re.match(r'\Areal_4\Z',coltype): 583 ligolw += '%13.7e' % tupi 584 elif re.match(r'\Areal_8\Z',coltype): 585 ligolw += '%22.16e' % tupi 586 else: 587 ligolw += str(tupi) 588 else: 589 ligolw += '' 590 if colidx < (len(self.table[tab]['column']) - 1): 591 ligolw += ',' 592 colidx += 1 593 stridx += 1 594 ligolw += '\n </Stream>\n' 595 ligolw += ' </Table>\n' 596 ligolw += '</LIGO_LW>' 597 598 return ligolw
599