Package glue :: Package ligolw :: Module dbtables
[hide private]
[frames] | no frames]

Source Code for Module glue.ligolw.dbtables

   1  # Copyright (C) 2007-2016  Kipp Cannon 
   2  # 
   3  # This program is free software; you can redistribute it and/or modify it 
   4  # under the terms of the GNU General Public License as published by the 
   5  # Free Software Foundation; either version 3 of the License, or (at your 
   6  # option) any later version. 
   7  # 
   8  # This program is distributed in the hope that it will be useful, but 
   9  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
  11  # Public License for more details. 
  12  # 
  13  # You should have received a copy of the GNU General Public License along 
  14  # with this program; if not, write to the Free Software Foundation, Inc., 
  15  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
  16   
  17   
  18  # 
  19  # ============================================================================= 
  20  # 
  21  #                                   Preamble 
  22  # 
  23  # ============================================================================= 
  24  # 
  25   
  26   
  27  """ 
  28  This module provides an implementation of the Table element that uses a 
  29  database engine for storage.  On top of that it then re-implements a number 
  30  of the tables from the lsctables module to provide versions of their 
  31  methods that work against the SQL database. 
  32  """ 
  33   
  34   
  35  import itertools 
  36  import operator 
  37  import os 
  38  import re 
  39  import shutil 
  40  import signal 
  41  import sys 
  42  import tempfile 
  43  import threading 
  44  from xml.sax.xmlreader import AttributesImpl 
  45  import warnings 
  46   
  47   
  48  from glue import git_version 
  49  from glue import offsetvector 
  50  from glue import segments 
  51  from . import ilwd 
  52  from . import ligolw 
  53  from . import table 
  54  from . import lsctables 
  55  from . import types as ligolwtypes 
  56  import six 
  57   
  58   
  59  __author__ = "Kipp Cannon <kipp.cannon@ligo.org>" 
  60  __version__ = "git id %s" % git_version.id 
  61  __date__ = git_version.date 
  62   
  63   
  64  # 
  65  # ============================================================================= 
  66  # 
  67  #                                  Connection 
  68  # 
  69  # ============================================================================= 
  70  # 
  71   
  72   
73 -def connection_db_type(connection):
74 """ 75 A totally broken attempt to determine what type of database a 76 connection object is attached to. Don't use this. 77 78 The input is a DB API 2.0 compliant connection object, the return 79 value is one of the strings "sqlite3" or "mysql". Raises TypeError 80 when the database type cannot be determined. 81 """ 82 if "sqlite" in repr(connection): 83 return "sqlite" 84 if "mysql" in repr(connection): 85 return "mysql" 86 raise TypeError(connection)
87 88 89 # 90 # Module-level variable used to hold references to 91 # tempfile.NamedTemporaryFiles objects to prevent them from being deleted 92 # while in use. NOT MEANT FOR USE BY CODE OUTSIDE OF THIS MODULE! 93 # 94 95 96 temporary_files = {} 97 temporary_files_lock = threading.Lock() 98 99 100 # 101 # Module-level variable to hold the signal handlers that have been 102 # overridden as part of the clean-up-scratch-files-on-signal feature. NOT 103 # MEANT FOR USE BY CODE OUTSIDE OF THIS MODULE! 104 # 105 106 107 origactions = {} 108 109
110 -def install_signal_trap(signums = (signal.SIGTERM, signal.SIGTSTP), retval = 1):
111 """ 112 Installs a signal handler to erase temporary scratch files when a 113 signal is received. This can be used to help ensure scratch files 114 are erased when jobs are evicted by Condor. signums is a squence 115 of the signals to trap, the default value is a list of the signals 116 used by Condor to kill and/or evict jobs. 117 118 The logic is as follows. If the current signal handler is 119 signal.SIG_IGN, i.e. the signal is being ignored, then the signal 120 handler is not modified since the reception of that signal would 121 not normally cause a scratch file to be leaked. Otherwise a signal 122 handler is installed that erases the scratch files. If the 123 original signal handler was a Python callable, then after the 124 scratch files are erased the original signal handler will be 125 invoked. If program control returns from that handler, i.e. that 126 handler does not cause the interpreter to exit, then sys.exit() is 127 invoked and retval is returned to the shell as the exit code. 128 129 Note: by invoking sys.exit(), the signal handler causes the Python 130 interpreter to do a normal shutdown. That means it invokes 131 atexit() handlers, and does other garbage collection tasks that it 132 normally would not do when killed by a signal. 133 134 Note: this function will not replace a signal handler more than 135 once, that is if it has already been used to set a handler 136 on a signal then it will be a no-op when called again for that 137 signal until uninstall_signal_trap() is used to remove the handler 138 from that signal. 139 140 Note: this function is called by get_connection_filename() 141 whenever it creates a scratch file. 142 """ 143 # NOTE: this must be called with the temporary_files_lock held. 144 # ignore signums we've already replaced 145 signums = set(signums) - set(origactions) 146 147 def temporary_file_cleanup_on_signal(signum, frame): 148 with temporary_files_lock: 149 temporary_files.clear() 150 if callable(origactions[signum]): 151 # original action is callable, chain to it 152 return origactions[signum](signum, frame) 153 # original action was not callable or the callable 154 # returned. invoke sys.exit() with retval as exit code 155 sys.exit(retval)
156 157 for signum in signums: 158 origactions[signum] = signal.getsignal(signum) 159 if origactions[signum] != signal.SIG_IGN: 160 # signal is not being ignored, so install our 161 # handler 162 signal.signal(signum, temporary_file_cleanup_on_signal) 163 164
165 -def uninstall_signal_trap(signums = None):
166 """ 167 Undo the effects of install_signal_trap(). Restores the original 168 signal handlers. If signums is a sequence of signal numbers only 169 the signal handlers for those signals will be restored (KeyError 170 will be raised if one of them is not one that install_signal_trap() 171 installed a handler for, in which case some undefined number of 172 handlers will have been restored). If signums is None (the 173 default) then all signals that have been modified by previous calls 174 to install_signal_trap() are restored. 175 176 Note: this function is called by put_connection_filename() and 177 discard_connection_filename() whenever they remove a scratch file 178 and there are then no more scrach files in use. 179 """ 180 # NOTE: this must be called with the temporary_files_lock held. 181 if signums is None: 182 signums = list(origactions.keys()) 183 for signum in signums: 184 signal.signal(signum, origactions.pop(signum))
185 186 187 # 188 # Functions to work with database files in scratch space 189 # 190 191
192 -def get_connection_filename(filename, tmp_path = None, replace_file = False, verbose = False):
193 """ 194 Utility code for moving database files to a (presumably local) 195 working location for improved performance and reduced fileserver 196 load. 197 """ 198 def mktmp(path, suffix = ".sqlite", verbose = False): 199 with temporary_files_lock: 200 # make sure the clean-up signal traps are installed 201 install_signal_trap() 202 # create the remporary file and replace it's 203 # unlink() function 204 temporary_file = tempfile.NamedTemporaryFile(suffix = suffix, dir = path if path != "_CONDOR_SCRATCH_DIR" else os.getenv("_CONDOR_SCRATCH_DIR")) 205 def new_unlink(self, orig_unlink = temporary_file.unlink): 206 # also remove a -journal partner, ignore all errors 207 try: 208 orig_unlink("%s-journal" % self) 209 except: 210 pass 211 orig_unlink(self)
212 temporary_file.unlink = new_unlink 213 filename = temporary_file.name 214 # hang onto reference to prevent its removal 215 temporary_files[filename] = temporary_file 216 if verbose: 217 sys.stderr.write("using '%s' as workspace\n" % filename) 218 # mkstemp() ignores umask, creates all files accessible 219 # only by owner; we should respect umask. note that 220 # os.umask() sets it, too, so we have to set it back after 221 # we know what it is 222 umsk = os.umask(0o777) 223 os.umask(umsk) 224 os.chmod(filename, 0o666 & ~umsk) 225 return filename 226 227 def truncate(filename, verbose = False): 228 if verbose: 229 sys.stderr.write("'%s' exists, truncating ... " % filename) 230 try: 231 fd = os.open(filename, os.O_WRONLY | os.O_TRUNC) 232 except Exception as e: 233 if verbose: 234 sys.stderr.write("cannot truncate '%s': %s\n" % (filename, str(e))) 235 return 236 os.close(fd) 237 if verbose: 238 sys.stderr.write("done.\n") 239 240 def cpy(srcname, dstname, verbose = False): 241 if verbose: 242 sys.stderr.write("copying '%s' to '%s' ... " % (srcname, dstname)) 243 shutil.copy2(srcname, dstname) 244 if verbose: 245 sys.stderr.write("done.\n") 246 try: 247 # try to preserve permission bits. according to 248 # the documentation, copy() and copy2() are 249 # supposed preserve them but don't. maybe they 250 # don't preserve them if the destination file 251 # already exists? 252 shutil.copystat(srcname, dstname) 253 except Exception as e: 254 if verbose: 255 sys.stderr.write("warning: ignoring failure to copy permission bits from '%s' to '%s': %s\n" % (filename, target, str(e))) 256 257 database_exists = os.access(filename, os.F_OK) 258 259 if tmp_path is not None: 260 # for suffix, can't use splitext() because it only keeps 261 # the last bit, e.g. won't give ".xml.gz" but just ".gz" 262 target = mktmp(tmp_path, suffix = ".".join(os.path.split(filename)[-1].split(".")[1:]), verbose = verbose) 263 if database_exists: 264 if replace_file: 265 # truncate database so that if this job 266 # fails the user won't think the database 267 # file is valid 268 truncate(filename, verbose = verbose) 269 else: 270 # need to copy existing database to work 271 # space for modifications 272 i = 1 273 while True: 274 try: 275 cpy(filename, target, verbose = verbose) 276 except IOError as e: 277 import errno 278 import time 279 if e.errno not in (errno.EPERM, errno.ENOSPC): 280 # anything other 281 # than out-of-space 282 # is a real error 283 raise 284 if i < 5: 285 if verbose: 286 sys.stderr.write("warning: attempt %d: %s, sleeping and trying again ...\n" % (i, errno.errorcode[e.errno])) 287 time.sleep(10) 288 i += 1 289 continue 290 if verbose: 291 sys.stderr.write("warning: attempt %d: %s: working with original file '%s'\n" % (i, errno.errorcode[e.errno], filename)) 292 with temporary_files_lock: 293 del temporary_files[target] 294 target = filename 295 break 296 else: 297 with temporary_files_lock: 298 if filename in temporary_files: 299 raise ValueError("file '%s' appears to be in use already as a temporary database file and is to be deleted" % filename) 300 target = filename 301 if database_exists and replace_file: 302 truncate(target, verbose = verbose) 303 304 del mktmp 305 del truncate 306 del cpy 307 308 return target 309 310
311 -def set_temp_store_directory(connection, temp_store_directory, verbose = False):
312 """ 313 Sets the temp_store_directory parameter in sqlite. 314 """ 315 if temp_store_directory == "_CONDOR_SCRATCH_DIR": 316 temp_store_directory = os.getenv("_CONDOR_SCRATCH_DIR") 317 if verbose: 318 sys.stderr.write("setting the temp_store_directory to %s ... " % temp_store_directory) 319 cursor = connection.cursor() 320 cursor.execute("PRAGMA temp_store_directory = '%s'" % temp_store_directory) 321 cursor.close() 322 if verbose: 323 sys.stderr.write("done\n")
324 325
326 -def put_connection_filename(filename, working_filename, verbose = False):
327 """ 328 This function reverses the effect of a previous call to 329 get_connection_filename(), restoring the working copy to its 330 original location if the two are different. This function should 331 always be called after calling get_connection_filename() when the 332 file is no longer in use. 333 334 During the move operation, this function traps the signals used by 335 Condor to evict jobs. This reduces the risk of corrupting a 336 document by the job terminating part-way through the restoration of 337 the file to its original location. When the move operation is 338 concluded, the original signal handlers are restored and if any 339 signals were trapped they are resent to the current process in 340 order. Typically this will result in the signal handlers installed 341 by the install_signal_trap() function being invoked, meaning any 342 other scratch files that might be in use get deleted and the 343 current process is terminated. 344 """ 345 if working_filename != filename: 346 # initialize SIGTERM and SIGTSTP trap 347 deferred_signals = [] 348 def newsigterm(signum, frame): 349 deferred_signals.append(signum)
350 oldhandlers = {} 351 for sig in (signal.SIGTERM, signal.SIGTSTP): 352 oldhandlers[sig] = signal.getsignal(sig) 353 signal.signal(sig, newsigterm) 354 355 # replace document 356 if verbose: 357 sys.stderr.write("moving '%s' to '%s' ... " % (working_filename, filename)) 358 shutil.move(working_filename, filename) 359 if verbose: 360 sys.stderr.write("done.\n") 361 362 # remove reference to tempfile.TemporaryFile object. 363 # because we've just deleted the file above, this would 364 # produce an annoying but harmless message about an ignored 365 # OSError, so we create a dummy file for the TemporaryFile 366 # to delete. ignore any errors that occur when trying to 367 # make the dummy file. FIXME: this is stupid, find a 368 # better way to shut TemporaryFile up 369 try: 370 open(working_filename, "w").close() 371 except: 372 pass 373 with temporary_files_lock: 374 del temporary_files[working_filename] 375 376 # restore original handlers, and send ourselves any trapped signals 377 # in order 378 for sig, oldhandler in six.iteritems(oldhandlers): 379 signal.signal(sig, oldhandler) 380 while deferred_signals: 381 os.kill(os.getpid(), deferred_signals.pop(0)) 382 383 # if there are no more temporary files in place, remove the 384 # temporary-file signal traps 385 with temporary_files_lock: 386 if not temporary_files: 387 uninstall_signal_trap() 388 389
390 -def discard_connection_filename(filename, working_filename, verbose = False):
391 """ 392 Like put_connection_filename(), but the working copy is simply 393 deleted instead of being copied back to its original location. 394 This is a useful performance boost if it is known that no 395 modifications were made to the file, for example if queries were 396 performed but no updates. 397 398 Note that the file is not deleted if the working copy and original 399 file are the same, so it is always safe to call this function after 400 a call to get_connection_filename() even if a separate working copy 401 is not created. 402 """ 403 if working_filename == filename: 404 return 405 with temporary_files_lock: 406 if verbose: 407 sys.stderr.write("removing '%s' ... " % working_filename) 408 # remove reference to tempfile.TemporaryFile object 409 del temporary_files[working_filename] 410 if verbose: 411 sys.stderr.write("done.") 412 # if there are no more temporary files in place, remove the 413 # temporary-file signal traps 414 if not temporary_files: 415 uninstall_signal_trap()
416 417 418 # 419 # ============================================================================= 420 # 421 # ID Mapping 422 # 423 # ============================================================================= 424 # 425 426
427 -def idmap_create(connection):
428 """ 429 Create the _idmap_ table. This table has columns "old" and "new" 430 containing text strings mapping old IDs to new IDs. The old column 431 is a primary key (is indexed and must contain unique entries). The 432 table is created as a temporary table, so it will be automatically 433 dropped when the database connection is closed. 434 435 This function is for internal use, it forms part of the code used 436 to re-map row IDs when merging multiple documents. 437 """ 438 connection.cursor().execute("CREATE TEMPORARY TABLE _idmap_ (old TEXT PRIMARY KEY NOT NULL, new TEXT NOT NULL)")
439 440
441 -def idmap_reset(connection):
442 """ 443 Erase the contents of the _idmap_ table, but leave the table in 444 place. 445 446 This function is for internal use, it forms part of the code used 447 to re-map row IDs when merging multiple documents. 448 """ 449 connection.cursor().execute("DELETE FROM _idmap_")
450 451
452 -def idmap_sync(connection):
453 """ 454 Iterate over the tables in the database, ensure that there exists a 455 custom DBTable class for each, and synchronize that table's ID 456 generator to the ID values in the database. 457 """ 458 xmldoc = get_xml(connection) 459 for tbl in xmldoc.getElementsByTagName(DBTable.tagName): 460 tbl.sync_next_id() 461 xmldoc.unlink()
462 463
464 -def idmap_get_new(connection, old, tbl):
465 """ 466 From the old ID string, obtain a replacement ID string by either 467 grabbing it from the _idmap_ table if one has already been assigned 468 to the old ID, or by using the current value of the Table 469 instance's next_id class attribute. In the latter case, the new ID 470 is recorded in the _idmap_ table, and the class attribute 471 incremented by 1. 472 473 This function is for internal use, it forms part of the code used 474 to re-map row IDs when merging multiple documents. 475 """ 476 cursor = connection.cursor() 477 cursor.execute("SELECT new FROM _idmap_ WHERE old == ?", (old,)) 478 new = cursor.fetchone() 479 if new is not None: 480 # a new ID has already been created for this old ID 481 return ilwd.ilwdchar(new[0]) 482 # this ID was not found in _idmap_ table, assign a new ID and 483 # record it 484 new = tbl.get_next_id() 485 cursor.execute("INSERT INTO _idmap_ VALUES (?, ?)", (old, new)) 486 return new
487 488
489 -def idmap_get_max_id(connection, id_class):
490 """ 491 Given an ilwd:char ID class, return the highest ID from the table 492 for whose IDs that is the class. 493 494 Example: 495 496 >>> event_id = ilwd.ilwdchar("sngl_burst:event_id:0") 497 >>> print(event_id) 498 sngl_inspiral:event_id:0 499 >>> max_id = get_max_id(connection, type(event_id)) 500 >>> print(max_id) 501 sngl_inspiral:event_id:1054 502 """ 503 cursor = connection.cursor() 504 cursor.execute("SELECT MAX(CAST(SUBSTR(%s, %d, 10) AS INTEGER)) FROM %s" % (id_class.column_name, id_class.index_offset + 1, id_class.table_name)) 505 maxid = cursor.fetchone()[0] 506 cursor.close() 507 if maxid is None: 508 return None 509 return id_class(maxid)
510 511 512 # 513 # ============================================================================= 514 # 515 # Database Information 516 # 517 # ============================================================================= 518 # 519 520 521 # 522 # SQL parsing 523 # 524 525 526 _sql_create_table_pattern = re.compile(r"CREATE\s+TABLE\s+(?P<name>\w+)\s*\((?P<coldefs>.*)\)", re.IGNORECASE) 527 _sql_coldef_pattern = re.compile(r"\s*(?P<name>\w+)\s+(?P<type>\w+)[^,]*") 528 529 530 # 531 # Database info extraction utils 532 # 533 534
535 -def get_table_names(connection):
536 """ 537 Return a list of the table names in the database. 538 """ 539 cursor = connection.cursor() 540 cursor.execute("SELECT name FROM sqlite_master WHERE type == 'table'") 541 return [name for (name,) in cursor]
542 543
544 -def get_column_info(connection, table_name):
545 """ 546 Return an in order list of (name, type) tuples describing the 547 columns in the given table. 548 """ 549 cursor = connection.cursor() 550 cursor.execute("SELECT sql FROM sqlite_master WHERE type == 'table' AND name == ?", (table_name,)) 551 statement, = cursor.fetchone() 552 coldefs = re.match(_sql_create_table_pattern, statement).groupdict()["coldefs"] 553 return [(coldef.groupdict()["name"], coldef.groupdict()["type"]) for coldef in re.finditer(_sql_coldef_pattern, coldefs) if coldef.groupdict()["name"].upper() not in ("PRIMARY", "UNIQUE", "CHECK")]
554 555
556 -def get_xml(connection, table_names = None):
557 """ 558 Construct an XML document tree wrapping around the contents of the 559 database. On success the return value is a ligolw.LIGO_LW element 560 containing the tables as children. Arguments are a connection to 561 to a database, and an optional list of table names to dump. If 562 table_names is not provided the set is obtained from get_table_names() 563 """ 564 ligo_lw = ligolw.LIGO_LW() 565 566 if table_names is None: 567 table_names = get_table_names(connection) 568 569 for table_name in table_names: 570 # build the table document tree. copied from 571 # lsctables.New() 572 try: 573 cls = TableByName[table_name] 574 except KeyError: 575 cls = DBTable 576 table_elem = cls(AttributesImpl({u"Name": u"%s:table" % table_name}), connection = connection) 577 for column_name, column_type in get_column_info(connection, table_elem.Name): 578 if table_elem.validcolumns is not None: 579 # use the pre-defined column type 580 column_type = table_elem.validcolumns[column_name] 581 else: 582 # guess the column type 583 column_type = ligolwtypes.FromSQLiteType[column_type] 584 table_elem.appendChild(table.Column(AttributesImpl({u"Name": u"%s:%s" % (table_name, column_name), u"Type": column_type}))) 585 table_elem._end_of_columns() 586 table_elem.appendChild(table.TableStream(AttributesImpl({u"Name": u"%s:table" % table_name, u"Delimiter": table.TableStream.Delimiter.default, u"Type": table.TableStream.Type.default}))) 587 ligo_lw.appendChild(table_elem) 588 return ligo_lw
589 590 591 # 592 # ============================================================================= 593 # 594 # DBTable Element Class 595 # 596 # ============================================================================= 597 # 598 599
600 -class DBTable(table.Table):
601 """ 602 A special version of the Table class using an SQL database for 603 storage. Many of the features of the Table class are not available 604 here, but instead the user can use SQL to query the table's 605 contents. 606 607 The constraints attribute can be set to a text string that will be 608 added to the table's CREATE statement where constraints go, for 609 example you might wish to set this to "PRIMARY KEY (event_id)" for 610 a table with an event_id column. 611 612 Note: because the table is stored in an SQL database, the use of 613 this class imposes the restriction that table names be unique 614 within a document. 615 616 Also note that at the present time there is really only proper 617 support for the pre-defined tables in the lsctables module. It is 618 possible to load unrecognized tables into a database from LIGO 619 Light Weight XML files, but without developer intervention there is 620 no way to indicate the constraints that should be imposed on the 621 columns, for example which columns should be used as primary keys 622 and so on. This can result in poor query performance. It is also 623 possible to extract a database' contents to a LIGO Light Weight XML 624 file even when the database contains unrecognized tables, but 625 without developer intervention the column types will be guessed 626 using a generic mapping of SQL types to LIGO Light Weight types. 627 628 Each instance of this class must be connected to a database. The 629 (Python DBAPI 2.0 compatible) connection object is passed to the 630 class via the connection parameter at instance creation time. 631 632 Example: 633 634 >>> import sqlite3 635 >>> connection = sqlite3.connection() 636 >>> tbl = dbtables.DBTable(AttributesImpl({u"Name": u"process:table"}), connection = connection) 637 638 A custom content handler must be created in order to pass the 639 connection keyword argument to the DBTable class when instances are 640 created, since the default content handler does not do this. See 641 the use_in() function defined in this module for information on how 642 to create such a content handler 643 644 If a custom glue.ligolw.Table subclass is defined in 645 glue.ligolw.lsctables whose name matches the name of the DBTable 646 being constructed, the lsctables class is added to the list of 647 parent classes. This allows the lsctables class' methods to be 648 used with the DBTable instances but not all of the methods will 649 necessarily work with the database-backed version of the class. 650 Your mileage may vary. 651 """
652 - def __new__(cls, *args, **kwargs):
653 # does this class already have table-specific metadata? 654 if not hasattr(cls, "tableName"): 655 # no, try to retrieve it from lsctables 656 attrs, = args 657 name = table.Table.TableName(attrs[u"Name"]) 658 if name in lsctables.TableByName: 659 # found metadata in lsctables, construct 660 # custom subclass. the class from 661 # lsctables is added as a parent class to 662 # allow methods from that class to be used 663 # with this class, however there is no 664 # guarantee that all parent class methods 665 # will be appropriate for use with the 666 # DB-backend object. 667 lsccls = lsctables.TableByName[name] 668 class CustomDBTable(cls, lsccls): 669 tableName = lsccls.tableName 670 validcolumns = lsccls.validcolumns 671 loadcolumns = lsccls.loadcolumns 672 constraints = lsccls.constraints 673 next_id = lsccls.next_id 674 RowType = lsccls.RowType 675 how_to_index = lsccls.how_to_index
676 677 # save for re-use (required for ID 678 # remapping across multiple documents in 679 # ligolw_sqlite) 680 TableByName[name] = CustomDBTable 681 682 # replace input argument with new class 683 cls = CustomDBTable 684 return table.Table.__new__(cls, *args)
685
686 - def __init__(self, *args, **kwargs):
687 # chain to parent class 688 table.Table.__init__(self, *args) 689 690 # retrieve connection object from kwargs 691 self.connection = kwargs.pop("connection") 692 693 # pre-allocate a cursor for internal queries 694 self.cursor = self.connection.cursor()
695
696 - def copy(self, *args, **kwargs):
697 """ 698 This method is not implemented. See 699 glue.ligolw.table.Table for more information. 700 """ 701 raise NotImplemented
702
703 - def _end_of_columns(self):
704 table.Table._end_of_columns(self) 705 # dbcolumnnames and types have the "not loaded" columns 706 # removed 707 if self.loadcolumns is not None: 708 self.dbcolumnnames = [name for name in self.columnnames if name in self.loadcolumns] 709 self.dbcolumntypes = [name for i, name in enumerate(self.columntypes) if self.columnnames[i] in self.loadcolumns] 710 else: 711 self.dbcolumnnames = self.columnnames 712 self.dbcolumntypes = self.columntypes 713 714 # create the table 715 ToSQLType = { 716 "sqlite": ligolwtypes.ToSQLiteType, 717 "mysql": ligolwtypes.ToMySQLType 718 }[connection_db_type(self.connection)] 719 try: 720 statement = "CREATE TABLE IF NOT EXISTS " + self.Name + " (" + ", ".join(map(lambda n, t: "%s %s" % (n, ToSQLType[t]), self.dbcolumnnames, self.dbcolumntypes)) 721 except KeyError as e: 722 raise ValueError("column type '%s' not supported" % str(e)) 723 if self.constraints is not None: 724 statement += ", " + self.constraints 725 statement += ")" 726 self.cursor.execute(statement) 727 728 # record the highest internal row ID 729 self.last_maxrowid = self.maxrowid() or 0 730 731 # construct the SQL to be used to insert new rows 732 params = { 733 "sqlite": ",".join("?" * len(self.dbcolumnnames)), 734 "mysql": ",".join(["%s"] * len(self.dbcolumnnames)) 735 }[connection_db_type(self.connection)] 736 self.append_statement = "INSERT INTO %s (%s) VALUES (%s)" % (self.Name, ",".join(self.dbcolumnnames), params) 737 self.append_attrgetter = operator.attrgetter(*self.dbcolumnnames)
738
739 - def _end_of_rows(self):
740 # FIXME: is this needed? 741 table.Table._end_of_rows(self) 742 self.connection.commit()
743
744 - def sync_next_id(self):
745 if self.next_id is not None: 746 max_id = idmap_get_max_id(self.connection, type(self.next_id)) 747 if max_id is None: 748 self.set_next_id(type(self.next_id)(0)) 749 else: 750 self.set_next_id(max_id + 1) 751 return self.next_id
752
753 - def maxrowid(self):
754 self.cursor.execute("SELECT MAX(ROWID) FROM %s" % self.Name) 755 return self.cursor.fetchone()[0]
756
757 - def __len__(self):
758 self.cursor.execute("SELECT COUNT(*) FROM %s" % self.Name) 759 return self.cursor.fetchone()[0]
760
761 - def __iter__(self):
762 cursor = self.connection.cursor() 763 cursor.execute("SELECT * FROM %s" % self.Name) 764 for values in cursor: 765 yield self.row_from_cols(values)
766 767 # FIXME: is adding this a good idea? 768 #def __delslice__(self, i, j): 769 # # sqlite numbers rows starting from 1: [0:10] becomes 770 # # "rowid between 1 and 10" which means 1 <= rowid <= 10, 771 # # which is the intended range 772 # self.cursor.execute("DELETE FROM %s WHERE ROWID BETWEEN %d AND %d" % (self.Name, i + 1, j)) 773
774 - def _append(self, row):
775 """ 776 Standard .append() method. This method is for intended for 777 internal use only. 778 """ 779 self.cursor.execute(self.append_statement, self.append_attrgetter(row))
780
781 - def _remapping_append(self, row):
782 """ 783 Replacement for the standard .append() method. This 784 version performs on the fly row ID reassignment, and so 785 also performs the function of the updateKeyMapping() 786 method. SQLite does not permit the PRIMARY KEY of a row to 787 be modified, so it needs to be done prior to insertion. 788 This method is intended for internal use only. 789 """ 790 if self.next_id is not None: 791 # assign (and record) a new ID before inserting the 792 # row to avoid collisions with existing rows 793 setattr(row, self.next_id.column_name, idmap_get_new(self.connection, getattr(row, self.next_id.column_name), self)) 794 self._append(row)
795 796 append = _append 797
798 - def row_from_cols(self, values):
799 """ 800 Given an iterable of values in the order of columns in the 801 database, construct and return a row object. This is a 802 convenience function for turning the results of database 803 queries into Python objects. 804 """ 805 row = self.RowType() 806 for c, t, v in zip(self.dbcolumnnames, self.dbcolumntypes, values): 807 if t in ligolwtypes.IDTypes: 808 v = ilwd.ilwdchar(v) 809 setattr(row, c, v) 810 return row
811 # backwards compatibility 812 _row_from_cols = row_from_cols 813 818
819 - def applyKeyMapping(self):
820 """ 821 Used as the second half of the key reassignment algorithm. 822 Loops over each row in the table, replacing references to 823 old row keys with the new values from the _idmap_ table. 824 """ 825 assignments = ", ".join("%s = (SELECT new FROM _idmap_ WHERE old == %s)" % (colname, colname) for coltype, colname in zip(self.dbcolumntypes, self.dbcolumnnames) if coltype in ligolwtypes.IDTypes and (self.next_id is None or colname != self.next_id.column_name)) 826 if assignments: 827 # SQLite documentation says ROWID is monotonically 828 # increasing starting at 1 for the first row unless 829 # it ever wraps around, then it is randomly 830 # assigned. ROWID is a 64 bit integer, so the only 831 # way it will wrap is if somebody sets it to a very 832 # high number manually. This library does not do 833 # that, so I don't bother checking. 834 self.cursor.execute("UPDATE %s SET %s WHERE ROWID > %d" % (self.Name, assignments, self.last_maxrowid)) 835 self.last_maxrowid = self.maxrowid() or 0
836 837 838 # 839 # ============================================================================= 840 # 841 # LSC Tables 842 # 843 # ============================================================================= 844 # 845 846
847 -class ProcessParamsTable(DBTable):
848 tableName = lsctables.ProcessParamsTable.tableName 849 validcolumns = lsctables.ProcessParamsTable.validcolumns 850 constraints = lsctables.ProcessParamsTable.constraints 851 next_id = lsctables.ProcessParamsTable.next_id 852 RowType = lsctables.ProcessParamsTable.RowType 853 how_to_index = lsctables.ProcessParamsTable.how_to_index 854
855 - def append(self, row):
856 if row.type is not None and row.type not in ligolwtypes.Types: 857 raise ligolw.ElementError("unrecognized type '%s'" % row.type) 858 DBTable.append(self, row)
859 860
861 -class TimeSlideTable(DBTable):
862 tableName = lsctables.TimeSlideTable.tableName 863 validcolumns = lsctables.TimeSlideTable.validcolumns 864 constraints = lsctables.TimeSlideTable.constraints 865 next_id = lsctables.TimeSlideTable.next_id 866 RowType = lsctables.TimeSlideTable.RowType 867 how_to_index = lsctables.TimeSlideTable.how_to_index 868
869 - def as_dict(self):
870 """ 871 Return a ditionary mapping time slide IDs to offset 872 dictionaries. 873 """ 874 return dict((ilwd.ilwdchar(time_slide_id), offsetvector.offsetvector((instrument, offset) for time_slide_id, instrument, offset in values)) for time_slide_id, values in itertools.groupby(self.cursor.execute("SELECT time_slide_id, instrument, offset FROM time_slide ORDER BY time_slide_id"), lambda time_slide_id_instrument_offset: time_slide_id_instrument_offset[0]))
875
876 - def get_time_slide_id(self, offsetdict, create_new = None, superset_ok = False, nonunique_ok = False):
877 """ 878 Return the time_slide_id corresponding to the offset vector 879 described by offsetdict, a dictionary of instrument/offset 880 pairs. 881 882 If the optional create_new argument is None (the default), 883 then the table must contain a matching offset vector. The 884 return value is the ID of that vector. If the table does 885 not contain a matching offset vector then KeyError is 886 raised. 887 888 If the optional create_new argument is set to a Process 889 object (or any other object with a process_id attribute), 890 then if the table does not contain a matching offset vector 891 a new one will be added to the table and marked as having 892 been created by the given process. The return value is the 893 ID of the (possibly newly created) matching offset vector. 894 895 If the optional superset_ok argument is False (the default) 896 then an offset vector in the table is considered to "match" 897 the requested offset vector only if they contain the exact 898 same set of instruments. If the superset_ok argument is 899 True, then an offset vector in the table is considered to 900 match the requested offset vector as long as it provides 901 the same offsets for the same instruments as the requested 902 vector, even if it provides offsets for other instruments 903 as well. 904 905 More than one offset vector in the table might match the 906 requested vector. If the optional nonunique_ok argument is 907 False (the default), then KeyError will be raised if more 908 than one offset vector in the table is found to match the 909 requested vector. If the optional nonunique_ok is True 910 then the return value is the ID of one of the matching 911 offset vectors selected at random. 912 """ 913 # look for matching offset vectors 914 if superset_ok: 915 ids = [id for id, slide in self.as_dict().items() if offsetdict == dict((instrument, offset) for instrument, offset in slide.items() if instrument in offsetdict)] 916 else: 917 ids = [id for id, slide in self.as_dict().items() if offsetdict == slide] 918 if len(ids) > 1: 919 # found more than one 920 if nonunique_ok: 921 # and that's OK 922 return ids[0] 923 # and that's not OK 924 raise KeyError(offsetdict) 925 if len(ids) == 1: 926 # found one 927 return ids[0] 928 # offset vector not found in table 929 if create_new is None: 930 # and that's not OK 931 raise KeyError(offsetdict) 932 # that's OK, create new vector 933 id = self.get_next_id() 934 for instrument, offset in offsetdict.items(): 935 row = self.RowType() 936 row.process_id = create_new.process_id 937 row.time_slide_id = id 938 row.instrument = instrument 939 row.offset = offset 940 self.append(row) 941 942 # return new ID 943 return id
944 945 946 # 947 # ============================================================================= 948 # 949 # Table Metadata 950 # 951 # ============================================================================= 952 # 953 954
955 -def build_indexes(connection, verbose = False):
956 """ 957 Using the how_to_index annotations in the table class definitions, 958 construct a set of indexes for the database at the given 959 connection. 960 """ 961 cursor = connection.cursor() 962 for table_name in get_table_names(connection): 963 # FIXME: figure out how to do this extensibly 964 if table_name in TableByName: 965 how_to_index = TableByName[table_name].how_to_index 966 elif table_name in lsctables.TableByName: 967 how_to_index = lsctables.TableByName[table_name].how_to_index 968 else: 969 continue 970 if how_to_index is not None: 971 if verbose: 972 sys.stderr.write("indexing %s table ...\n" % table_name) 973 for index_name, cols in six.iteritems(how_to_index): 974 cursor.execute("CREATE INDEX IF NOT EXISTS %s ON %s (%s)" % (index_name, table_name, ",".join(cols))) 975 connection.commit()
976 977 978 # 979 # ============================================================================= 980 # 981 # Table Metadata 982 # 983 # ============================================================================= 984 # 985 986 987 # 988 # Table name ---> table type mapping. 989 # 990 991 992 TableByName = { 993 ProcessParamsTable.tableName: ProcessParamsTable, 994 TimeSlideTable.tableName: TimeSlideTable 995 } 996 997 998 # 999 # ============================================================================= 1000 # 1001 # Content Handler 1002 # 1003 # ============================================================================= 1004 # 1005 1006 1007 # 1008 # Override portions of a ligolw.LIGOLWContentHandler class 1009 # 1010 1011
1012 -def use_in(ContentHandler):
1013 """ 1014 Modify ContentHandler, a sub-class of 1015 glue.ligolw.LIGOLWContentHandler, to cause it to use the DBTable 1016 class defined in this module when parsing XML documents. Instances 1017 of the class must provide a connection attribute. When a document 1018 is parsed, the value of this attribute will be passed to the 1019 DBTable class' .__init__() method as each table object is created, 1020 and thus sets the database connection for all table objects in the 1021 document. 1022 1023 Example: 1024 1025 >>> import sqlite3 1026 >>> from glue.ligolw import ligolw 1027 >>> class MyContentHandler(ligolw.LIGOLWContentHandler): 1028 ... def __init__(self, *args): 1029 ... super(MyContentHandler, self).__init__(*args) 1030 ... self.connection = sqlite3.connection() 1031 ... 1032 >>> use_in(MyContentHandler) 1033 1034 Multiple database files can be in use at once by creating a content 1035 handler class for each one. 1036 """ 1037 ContentHandler = lsctables.use_in(ContentHandler) 1038 1039 def startTable(self, parent, attrs): 1040 name = table.Table.TableName(attrs[u"Name"]) 1041 if name in TableByName: 1042 return TableByName[name](attrs, connection = self.connection) 1043 return DBTable(attrs, connection = self.connection)
1044 1045 ContentHandler.startTable = startTable 1046 1047 return ContentHandler 1048