Package glue :: Module pipeline
[hide private]
[frames] | no frames]

Source Code for Module glue.pipeline

   1  """ 
   2  This modules contains objects that make it simple for the user to 
   3  create python scripts that build Condor DAGs to run code on the LSC 
   4  Data Grid. 
   5   
   6  This file is part of the Grid LSC User Environment (GLUE) 
   7   
   8  GLUE is free software: you can redistribute it and/or modify it under the 
   9  terms of the GNU General Public License as published by the Free Software 
  10  Foundation, either version 3 of the License, or (at your option) any later 
  11  version. 
  12   
  13  This program is distributed in the hope that it will be useful, but WITHOUT 
  14  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  15  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 
  16  details. 
  17   
  18  You should have received a copy of the GNU General Public License along with 
  19  this program.  If not, see <http://www.gnu.org/licenses/>. 
  20  """ 
  21   
  22  from __future__ import print_function 
  23  __author__ = 'Duncan Brown <duncan@gravity.phys.uwm.edu>' 
  24  from glue import git_version 
  25  __date__ = git_version.date 
  26  __version__ = git_version.id 
  27   
  28  import os 
  29  import sys 
  30  import string, re 
  31  import time 
  32  import random 
  33  import math 
  34  from six.moves import urllib 
  35  import stat 
  36  import socket 
  37  import itertools 
  38  import glue.segments 
  39  from hashlib import md5 
  40   
  41  try: 
  42    from cjson import decode 
  43  except ImportError: 
  44    from json import loads as decode 
  45   
  46  try: 
  47    import six.moves.http_client 
  48  except: 
  49    pass 
  50   
  51  import StringIO 
  52  import six.moves.configparser 
  53   
  54   
  55  # Some scripts that are used to set up a pegasus DAX 
  56  PEGASUS_SCRIPT="""#!/bin/bash 
  57  TMP_EXEC_DIR=%s 
  58  IHOPE_RUN_DIR=%s 
  59  UEBER_CONCRETE_DAG=%s 
  60  usage() 
  61  { 
  62    echo "Usage: pegasus_submit_dag [-f] [-h]" 
  63    echo 
  64    echo "  -f, --force           Force re-plan and resubmit of DAX" 
  65    echo "  -h, --help            Print this message" 
  66    echo 
  67  } 
  68   
  69  if [ $# -gt 1 ] ; then 
  70    usage 
  71    exit 1 
  72  fi 
  73   
  74  if [ $# -eq 1 ] ; then 
  75   if [ $1 = "-h" ] || [ $1 = "--help" ] ; then 
  76     usage 
  77     exit 0 
  78   fi 
  79   if [ $1 = "-f" ] || [ $1 = "--force" ] ; then 
  80     echo "WARNING: removing any existing workflow files!" 
  81     pegasus-remove ${TMP_EXEC_DIR}/. 
  82     echo "Sleeping for 60 seconds to give running DAGs chance to exit..." 
  83     sleep 60 
  84     rm -rf ${TMP_EXEC_DIR} 
  85     mkdir ${TMP_EXEC_DIR} 
  86     chmod 755 ${TMP_EXEC_DIR} 
  87   else 
  88     usage 
  89     exit 1 
  90   fi 
  91  fi 
  92  if [ -f ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG}.lock ] ; then 
  93    echo 
  94    echo "ERROR: A dagman lock file already exists which may indicate that your" 
  95    echo "workflow is already running. Please check the status of your DAX with" 
  96    echo 
  97    echo "    pegasus-status ${TMP_EXEC_DIR}/." 
  98    echo 
  99    echo "If necessary, you can remove the workflow with" 
 100    echo 
 101    echo "    pegasus-remove ${TMP_EXEC_DIR}/." 
 102    echo 
 103    echo "You can also run" 
 104    echo 
 105    echo "    pegasus_submit_dax -f" 
 106    echo 
 107    echo "to force the workflow to re-run. This will remove any existing" 
 108    echo "workflow log and error files. If these need to be preserved," 
 109    echo "you must back them up first before running with -f." 
 110    echo 
 111    exit 1 
 112  fi 
 113   
 114  # The theory here is to find the longest living 
 115  # proxy certificate and copy it into the default 
 116  # location so that workflows can find it even after 
 117  # the user has logged out. This is necessary because 
 118  # Condor and condor_dagman do not yet properly cache 
 119  # and manage credentials to make them available to all 
 120  # jobs in the workflow, and other tools make assumptions 
 121  # about where a proxy, service, or user certificate is located 
 122  # on the file system and do not properly find valid 
 123  # existing credentials using the proper GSI search algorithm. 
 124  # 
 125  # This is not a great solution because there can be quite 
 126  # valid reasons to have multiple credentials with different 
 127  # lifetimes, and it presents a security risk to circumvent 
 128  # the state and move files around without knowing why the 
 129  # state is the way it is. 
 130  # 
 131  # But to support LIGO users we are doing it at this time 
 132  # until better tooling is available. 
 133   
 134  # 
 135  # Assumes grid-proxy-info is in PATH 
 136   
 137  if ! `/usr/bin/which grid-proxy-info > /dev/null 2>&1` ; then 
 138      echo "ERROR: cannot find grid-proxy-info in PATH"; 
 139          exit 1 
 140          fi 
 141   
 142          # default location for proxy certificates based on uid 
 143         x509_default="/tmp/x509up_u`id -u`" 
 144   
 145   
 146   
 147  # if X509_USER_PROXY is defined and has a lifetime of > 1 hour 
 148  # compare to any existing default and copy it into place if 
 149  # and only if its lifetime is greater than the default 
 150   
 151  if [ -n "$X509_USER_PROXY" ] ; then 
 152      echo "X509_USER_PROXY=${X509_USER_PROXY}" 
 153      if `grid-proxy-info -file ${X509_USER_PROXY} -exists -valid 1:0` ; then 
 154          nsec=`grid-proxy-info -file ${X509_USER_PROXY} -timeleft` 
 155          echo "Lifetime of ${X509_USER_PROXY} ${nsec} seconds" 
 156          if [ -e ${x509_default} ] ; then 
 157              echo "Proxy exists at default location" 
 158              if `grid-proxy-info -file ${x509_default} -exists -valid 1:0` ; then 
 159                  nsec=`grid-proxy-info -file ${X509_USER_PROXY} -timeleft` 
 160                  echo "Lifetime of default ${nsec} seconds" 
 161                  env_life=`grid-proxy-info -file ${X509_USER_PROXY} -timeleft` 
 162                  def_life=`grid-proxy-info -file ${x509_default} -timeleft` 
 163                  if [ ${env_life} -gt ${def_life} ] ; then 
 164                      cp ${X509_USER_PROXY} ${x509_default} 
 165                      echo "Lifetime of ${X509_USER_PROXY} > default" 
 166                      echo "Copied ${X509_USER_PROXY} into default location" 
 167                  else 
 168                      echo "Lifetime of default > ${X509_USER_PROXY}" 
 169                      echo "Leaving default in place" 
 170                  fi 
 171              else 
 172                  echo "Lifetime of default < 1 hour" 
 173                  cp ${X509_USER_PROXY} ${x509_default} 
 174                  echo "Lifetime of ${X509_USER_PROXY} > default" 
 175                  echo "Copied ${X509_USER_PROXY} into default location" 
 176              fi 
 177          else 
 178              echo "No proxy at default location" 
 179              cp ${X509_USER_PROXY} $x509_default 
 180              echo "Copied ${X509_USER_PROXY} into default location" 
 181          fi 
 182      else 
 183          echo "Lifetime of ${X509_USER_PROXY} < 1 hour" 
 184          echo "Ignoring ${X509_USER_PROXY}" 
 185          echo "Assuming default location for proxy" 
 186      fi 
 187  else 
 188      echo "X509_USER_PROXY not set" 
 189      echo "Assuming default location for proxy" 
 190  fi 
 191   
 192  # when we get here we can assume that if a valid proxy with lifetime > 1 exists 
 193  # then it is in the default location, so test for it now 
 194   
 195  valid=`grid-proxy-info -file ${x509_default} -exists -valid 1:0 > /dev/null 2>&1` 
 196  if ! ${valid} ; then 
 197      echo "ERROR: could not find proxy with lifetime > 1 hour" 
 198      exit 1 
 199  fi 
 200   
 201  # if we get here valid proxy with lifetime > 1 hour was 
 202  # found so print out details for the record 
 203  grid-proxy-info -file ${x509_default} -all 
 204   
 205  # set X509_USER_PROXY to the default now 
 206  X509_USER_PROXY=${x509_default} 
 207  export X509_USER_PROXY 
 208   
 209  # set specific condor variables needed by pegasus 
 210   
 211  export _CONDOR_DAGMAN_LOG_ON_NFS_IS_ERROR=FALSE 
 212  export _CONDOR_DAGMAN_COPY_TO_SPOOL=False 
 213   
 214  if [ -f ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG} ] ; then 
 215    pegasus-run --conf ${IHOPE_RUN_DIR}/pegasus.properties ${TMP_EXEC_DIR}/. 
 216  else 
 217    pegasus-plan --conf ${IHOPE_RUN_DIR}/pegasus.properties \\ 
 218                 --dax %s \\ 
 219                 --dir ${TMP_EXEC_DIR} \\ 
 220                 %s -s %s --nocleanup -f --submit 
 221   
 222    ln -sf ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG}.dagman.out ${UEBER_CONCRETE_DAG}.dagman.out 
 223  fi 
 224  """ 
 225   
 226  PEGASUS_BASEDIR_SCRIPT=""" 
 227  #!/bin/bash 
 228   
 229  TMP_EXEC_DIR=%s 
 230  UEBER_CONCRETE_DAG=%s 
 231   
 232  usage() 
 233  { 
 234    echo "Usage: pegasus_basedir [-d]" 
 235    echo 
 236    echo "Prints the name of the Pegasus basedir where the condor files can be found" 
 237    echo 
 238    echo "  -d, --dag             Append the name of the concrete DAG to the basedir" 
 239    echo "  -h, --help            Print this message" 
 240    echo 
 241  } 
 242   
 243  if [ $# -gt 1 ] ; then 
 244    usage 
 245    exit 1 
 246  fi 
 247   
 248   
 249  if [ $# -eq 1 ] ; then 
 250   if [ $1 = "-h" ] || [ $1 = "--help" ] ; then 
 251     usage 
 252     exit 0 
 253   fi 
 254   if [ $1 = "-d" ] || [ $1 = "--dag" ] ; then 
 255     echo ${TMP_EXEC_DIR}/${UEBER_CONCRETE_DAG} 
 256     exit 0 
 257   else 
 258     usage 
 259     exit 1 
 260   fi 
 261  fi 
 262   
 263  echo ${TMP_EXEC_DIR} 
 264  exit 0 
 265  """ 
 266   
 267  PEGASUS_PROPERTIES= """ 
 268  ############################################################################### 
 269  # Pegasus properties file generated by pipeline 
 270  # 
 271  ############################################################################### 
 272  # Catalog Properties 
 273   
 274  # Specifies which type of replica catalog to use during the planning process 
 275  # In File mode, Pegasus queries a file based replica catalog. 
 276  pegasus.catalog.replica=File 
 277   
 278  # Sets the location of the site catalog file that contains the description of 
 279  # the sites available to run jobs. 
 280  pegasus.catalog.site.file=%s/sites.xml 
 281   
 282   
 283  ############################################################################### 
 284  # Transfer Configuration Properties 
 285   
 286  # If pegasus sees a pool attribute in the replica catalog associated with the 
 287  # PFN that matches the execution pool, return the PFN as a file URL, so that 
 288  # the transfer executable uses ln to link the input files. 
 289  pegasus.transfer.links=true 
 290   
 291   
 292  ############################################################################### 
 293  # Dagman Profile Namespace 
 294   
 295  # sets the maximum number of PRE scripts within the DAG that may be running at 
 296  # one time (including pegasus-plan, which is run as a pre script). 
 297  dagman.maxpre=1 
 298   
 299  # number of times DAGMan retries the full job cycle from pre-script through 
 300  # post-script, if failure was detected 
 301  dagman.retry=3 
 302   
 303   
 304  ############################################################################### 
 305  # Site Selection Properties 
 306   
 307  # Jobs will be assigned in a round robin manner amongst the sites that can 
 308  # execute them. 
 309  pegasus.selector.site=RoundRobin 
 310   
 311   
 312  ############################################################################### 
 313  # Site Directories 
 314   
 315  # While creating the submit directory use a timestamp based numbering scheme 
 316  # instead of the default runxxxx scheme. 
 317  pegasus.dir.useTimestamp=true 
 318   
 319   
 320  ############################################################################### 
 321  # Directory Properties 
 322   
 323  # Use directory labels for sub-workflows (needed for ihope) 
 324  pegasus.dir.submit.subwf.labelbased=true 
 325   
 326   
 327  """ 
 328   
 329   
330 -def s2play(t):
331 """ 332 Return True if t is in the S2 playground, False otherwise 333 t = GPS time to test if playground 334 """ 335 return ((t - 729273613) % 6370) < 600
336 337 # FIXME convenience function until pegasus does this for us
338 -def recurse_pfn_cache(node,caches=[]):
339 for parent in node._CondorDAGNode__parents: 340 if isinstance(parent.job(), CondorDAGManJob): 341 if parent.job().get_dax() is None: 342 pass 343 else: 344 caches = recurse_pfn_cache(parent,caches) 345 dax_name = os.path.basename(parent.job().get_dax()) 346 dax_basename = '.'.join(dax_name.split('.')[0:-1]) 347 caches.append( os.path.join( 348 parent.job().get_pegasus_exec_dir(), dax_basename + '_0.cache') ) 349 return caches
350 351
352 -class CondorError(Exception):
353 """Error thrown by Condor Jobs"""
354 - def __init__(self, args=None):
355 self.args = args
356 -class CondorJobError(CondorError):
357 pass
358 -class CondorSubmitError(CondorError):
359 pass
360 -class CondorDAGError(CondorError):
361 pass
362 -class CondorDAGJobError(CondorError):
363 pass
364 -class CondorDAGNodeError(CondorError):
365 pass
366 -class SegmentError(Exception):
367 - def __init__(self, args=None):
368 self.args = args
369 370
371 -class CondorJob:
372 """ 373 Generic condor job class. Provides methods to set the options in the 374 condor submit file for a particular executable 375 """
376 - def __init__(self, universe, executable, queue):
377 """ 378 @param universe: the condor universe to run the job in. 379 @param executable: the executable to run. 380 @param queue: number of jobs to queue. 381 """ 382 self.__universe = universe 383 self.__executable = executable 384 self.__queue = queue 385 386 # These are set by methods in the class 387 self.__options = {} 388 self.__short_options = {} 389 self.__arguments = [] 390 self.__condor_cmds = {} 391 self.__notification = None 392 self.__log_file = None 393 self.__in_file = None 394 self.__err_file = None 395 self.__out_file = None 396 self.__sub_file_path = None 397 self.__output_files = [] 398 self.__input_files = [] 399 self.__checkpoint_files = [] 400 self.__grid_type = None 401 self.__grid_server = None 402 self.__grid_scheduler = None 403 self.__executable_installed = True
404
405 - def get_executable(self):
406 """ 407 Return the name of the executable for this job. 408 """ 409 return self.__executable
410
411 - def set_executable(self, executable):
412 """ 413 Set the name of the executable for this job. 414 """ 415 self.__executable = executable
416
417 - def get_universe(self):
418 """ 419 Return the condor universe that the job will run in. 420 """ 421 return self.__universe
422
423 - def set_universe(self, universe):
424 """ 425 Set the condor universe for the job to run in. 426 @param universe: the condor universe to run the job in. 427 """ 428 self.__universe = universe
429
430 - def get_grid_type(self):
431 """ 432 Return the grid type of the job. 433 """ 434 return self.__grid_type
435
436 - def set_grid_type(self, grid_type):
437 """ 438 Set the type of grid resource for the job. 439 @param grid_type: type of grid resource. 440 """ 441 self.__grid_type = grid_type
442
443 - def get_grid_server(self):
444 """ 445 Return the grid server on which the job will run. 446 """ 447 return self.__grid_server
448
449 - def set_grid_server(self, grid_server):
450 """ 451 Set the grid server on which to run the job. 452 @param grid_server: grid server on which to run. 453 """ 454 self.__grid_server = grid_server
455
456 - def get_grid_scheduler(self):
457 """ 458 Return the grid scheduler. 459 """ 460 return self.__grid_scheduler
461
462 - def set_grid_scheduler(self, grid_scheduler):
463 """ 464 Set the grid scheduler. 465 @param grid_scheduler: grid scheduler on which to run. 466 """ 467 self.__grid_scheduler = grid_scheduler
468
469 - def set_executable_installed(self,installed):
470 """ 471 If executable installed is true, then no copying of the executable is 472 done. If it is false, pegasus stages the executable to the remote site. 473 Default is executable is installed (i.e. True). 474 @param installed: true or fale 475 """ 476 self.__executable_installed = installed
477
478 - def get_executable_installed(self):
479 """ 480 return whether or not the executable is installed 481 """ 482 return self.__executable_installed
483
484 - def add_condor_cmd(self, cmd, value):
485 """ 486 Add a Condor command to the submit file (e.g. a class add or evironment). 487 @param cmd: Condor command directive. 488 @param value: value for command. 489 """ 490 self.__condor_cmds[cmd] = value
491
492 - def get_condor_cmds(self):
493 """ 494 Return the dictionary of condor keywords to add to the job 495 """ 496 return self.__condor_cmds
497
498 - def add_input_file(self, filename):
499 """ 500 Add filename as a necessary input file for this DAG node. 501 502 @param filename: input filename to add 503 """ 504 if filename not in self.__input_files: 505 self.__input_files.append(filename)
506
507 - def add_output_file(self, filename):
508 """ 509 Add filename as a output file for this DAG node. 510 511 @param filename: output filename to add 512 """ 513 if filename not in self.__output_files: 514 self.__output_files.append(filename)
515
516 - def add_checkpoint_file(self, filename):
517 """ 518 Add filename as a checkpoint file for this DAG job. 519 """ 520 if filename not in self.__checkpoint_files: 521 self.__checkpoint_files.append(filename)
522
523 - def get_input_files(self):
524 """ 525 Return list of input files for this DAG node. 526 """ 527 return self.__input_files
528
529 - def get_output_files(self):
530 """ 531 Return list of output files for this DAG node. 532 """ 533 return self.__output_files
534
535 - def get_checkpoint_files(self):
536 """ 537 Return a list of checkpoint files for this DAG node 538 """ 539 return self.__checkpoint_files
540
541 - def add_arg(self, arg):
542 """ 543 Add an argument to the executable. Arguments are appended after any 544 options and their order is guaranteed. 545 @param arg: argument to add. 546 """ 547 self.__arguments.append(arg)
548
549 - def add_file_arg(self, filename):
550 """ 551 Add a file argument to the executable. Arguments are appended after any 552 options and their order is guaranteed. Also adds the file name to the 553 list of required input data for this job. 554 @param filename: file to add as argument. 555 """ 556 self.__arguments.append(filename) 557 if filename not in self.__input_files: 558 self.__input_files.append(filename)
559
560 - def get_args(self):
561 """ 562 Return the list of arguments that are to be passed to the executable. 563 """ 564 return self.__arguments
565
566 - def add_opt(self, opt, value):
567 """ 568 Add a command line option to the executable. The order that the arguments 569 will be appended to the command line is not guaranteed, but they will 570 always be added before any command line arguments. The name of the option 571 is prefixed with double hyphen and the program is expected to parse it 572 with getopt_long(). 573 @param opt: command line option to add. 574 @param value: value to pass to the option (None for no argument). 575 """ 576 self.__options[opt] = value
577
578 - def get_opt( self, opt):
579 """ 580 Returns the value associated with the given command line option. 581 Returns None if the option does not exist in the options list. 582 @param opt: command line option 583 """ 584 if opt in self.__options: 585 return self.__options[opt] 586 return None
587
588 - def add_file_opt(self, opt, filename):
589 """ 590 Add a command line option to the executable. The order that the arguments 591 will be appended to the command line is not guaranteed, but they will 592 always be added before any command line arguments. The name of the option 593 is prefixed with double hyphen and the program is expected to parse it 594 with getopt_long(). 595 @param opt: command line option to add. 596 @param value: value to pass to the option (None for no argument). 597 """ 598 self.__options[opt] = filename 599 if filename not in self.__input_files: 600 self.__input_files.append(filename)
601
602 - def get_opts(self):
603 """ 604 Return the dictionary of opts for the job. 605 """ 606 return self.__options
607
608 - def add_short_opt(self, opt, value):
609 """ 610 Add a command line option to the executable. The order that the arguments 611 will be appended to the command line is not guaranteed, but they will 612 always be added before any command line arguments. The name of the option 613 is prefixed with single hyphen and the program is expected to parse it 614 with getopt() or getopt_long() (if a single character option), or 615 getopt_long_only() (if multiple characters). Long and (single-character) 616 short options may be mixed if the executable permits this. 617 @param opt: command line option to add. 618 @param value: value to pass to the option (None for no argument). 619 """ 620 self.__short_options[opt] = value
621
622 - def get_short_opts(self):
623 """ 624 Return the dictionary of short options for the job. 625 """ 626 return self.__short_options
627
628 - def add_ini_opts(self, cp, section):
629 """ 630 Parse command line options from a given section in an ini file and 631 pass to the executable. 632 @param cp: ConfigParser object pointing to the ini file. 633 @param section: section of the ini file to add to the options. 634 """ 635 for opt in cp.options(section): 636 arg = string.strip(cp.get(section,opt)) 637 self.__options[opt] = arg
638
639 - def set_notification(self, value):
640 """ 641 Set the email address to send notification to. 642 @param value: email address or never for no notification. 643 """ 644 self.__notification = value
645
646 - def set_log_file(self, path):
647 """ 648 Set the Condor log file. 649 @param path: path to log file. 650 """ 651 self.__log_file = path
652
653 - def set_stdin_file(self, path):
654 """ 655 Set the file from which Condor directs the stdin of the job. 656 @param path: path to stdin file. 657 """ 658 self.__in_file = path
659
660 - def get_stdin_file(self):
661 """ 662 Get the file from which Condor directs the stdin of the job. 663 """ 664 return self.__in_file
665
666 - def set_stderr_file(self, path):
667 """ 668 Set the file to which Condor directs the stderr of the job. 669 @param path: path to stderr file. 670 """ 671 self.__err_file = path
672
673 - def get_stderr_file(self):
674 """ 675 Get the file to which Condor directs the stderr of the job. 676 """ 677 return self.__err_file
678
679 - def set_stdout_file(self, path):
680 """ 681 Set the file to which Condor directs the stdout of the job. 682 @param path: path to stdout file. 683 """ 684 self.__out_file = path
685
686 - def get_stdout_file(self):
687 """ 688 Get the file to which Condor directs the stdout of the job. 689 """ 690 return self.__out_file
691
692 - def set_sub_file(self, path):
693 """ 694 Set the name of the file to write the Condor submit file to when 695 write_sub_file() is called. 696 @param path: path to submit file. 697 """ 698 self.__sub_file_path = path
699
700 - def get_sub_file(self):
701 """ 702 Get the name of the file which the Condor submit file will be 703 written to when write_sub_file() is called. 704 """ 705 return self.__sub_file_path
706
707 - def write_sub_file(self):
708 """ 709 Write a submit file for this Condor job. 710 """ 711 if not self.__log_file: 712 raise CondorSubmitError("Log file not specified.") 713 if not self.__err_file: 714 raise CondorSubmitError("Error file not specified.") 715 if not self.__out_file: 716 raise CondorSubmitError("Output file not specified.") 717 718 if not self.__sub_file_path: 719 raise CondorSubmitError('No path for submit file.') 720 try: 721 subfile = open(self.__sub_file_path, 'w') 722 except: 723 raise CondorSubmitError("Cannot open file " + self.__sub_file_path) 724 725 if self.__universe == 'grid': 726 if self.__grid_type == None: 727 raise CondorSubmitError('No grid type specified.') 728 elif self.__grid_type == 'gt2': 729 if self.__grid_server == None: 730 raise CondorSubmitError('No server specified for grid resource.') 731 elif self.__grid_type == 'gt4': 732 if self.__grid_server == None: 733 raise CondorSubmitError('No server specified for grid resource.') 734 if self.__grid_scheduler == None: 735 raise CondorSubmitError('No scheduler specified for grid resource.') 736 else: 737 raise CondorSubmitError('Unsupported grid resource.') 738 739 subfile.write( 'universe = ' + self.__universe + '\n' ) 740 subfile.write( 'executable = ' + self.__executable + '\n' ) 741 742 if self.__universe == 'grid': 743 if self.__grid_type == 'gt2': 744 subfile.write('grid_resource = %s %s\n' % (self.__grid_type, 745 self.__grid_server)) 746 if self.__grid_type == 'gt4': 747 subfile.write('grid_resource = %s %s %s\n' % (self.__grid_type, 748 self.__grid_server, self.__grid_scheduler)) 749 750 if self.__universe == 'grid': 751 subfile.write('when_to_transfer_output = ON_EXIT\n') 752 subfile.write('transfer_output_files = $(macrooutput)\n') 753 subfile.write('transfer_input_files = $(macroinput)\n') 754 755 if list(self.__options.keys()) or list(self.__short_options.keys()) or self.__arguments: 756 subfile.write( 'arguments = "' ) 757 for c in self.__options.keys(): 758 if self.__options[c]: 759 subfile.write( ' --' + c + ' ' + self.__options[c] ) 760 else: 761 subfile.write( ' --' + c ) 762 for c in self.__short_options.keys(): 763 if self.__short_options[c]: 764 subfile.write( ' -' + c + ' ' + self.__short_options[c] ) 765 else: 766 subfile.write( ' -' + c ) 767 for c in self.__arguments: 768 subfile.write( ' ' + c ) 769 subfile.write( ' "\n' ) 770 771 for cmd in self.__condor_cmds.keys(): 772 subfile.write( str(cmd) + " = " + str(self.__condor_cmds[cmd]) + '\n' ) 773 774 subfile.write( 'log = ' + self.__log_file + '\n' ) 775 if self.__in_file is not None: 776 subfile.write( 'input = ' + self.__in_file + '\n' ) 777 subfile.write( 'error = ' + self.__err_file + '\n' ) 778 subfile.write( 'output = ' + self.__out_file + '\n' ) 779 if self.__notification: 780 subfile.write( 'notification = ' + self.__notification + '\n' ) 781 subfile.write( 'queue ' + str(self.__queue) + '\n' ) 782 783 subfile.close()
784 785 786
787 -class CondorDAGJob(CondorJob):
788 """ 789 A Condor DAG job never notifies the user on completion and can have variable 790 options that are set for a particular node in the DAG. Inherits methods 791 from a CondorJob. 792 """
793 - def __init__(self, universe, executable):
794 """ 795 universe = the condor universe to run the job in. 796 executable = the executable to run in the DAG. 797 """ 798 CondorJob.__init__(self, universe, executable, 1) 799 CondorJob.set_notification(self, 'never') 800 self.__var_opts = [] 801 self.__arg_index = 0 802 self.__var_args = [] 803 self.__var_cmds = [] 804 self.__grid_site = None 805 self.__bad_macro_chars = re.compile(r'[_-]') 806 self.__dax_mpi_cluster = None
807
808 - def create_node(self):
809 """ 810 Create a condor node from this job. This provides a basic interface to 811 the CondorDAGNode class. Most jobs in a workflow will subclass the 812 CondorDAGNode class and overwrite this to give more details when 813 initializing the node. However, this will work fine for jobs with very simp 814 input/output. 815 """ 816 return CondorDAGNode(self)
817
818 - def set_grid_site(self,site):
819 """ 820 Set the grid site to run on. If not specified, 821 will not give hint to Pegasus 822 """ 823 self.__grid_site=str(site) 824 if site != 'local': 825 self.set_executable_installed(False)
826
827 - def get_grid_site(self):
828 """ 829 Return the grid site for this node 830 """ 831 return self.__grid_site
832
833 - def set_dax_mpi_cluster(self,size):
834 """ 835 Set the DAX collapse key for this node 836 """ 837 self.__dax_mpi_cluster = size
838
839 - def get_dax_mpi_cluster(self):
840 """ 841 Get the DAX collapse key for this node 842 """ 843 return self.__dax_mpi_cluster
844
845 - def add_var_opt(self, opt, short=False):
846 """ 847 Add a variable (or macro) option to the condor job. The option is added 848 to the submit file and a different argument to the option can be set for 849 each node in the DAG. 850 @param opt: name of option to add. 851 """ 852 if opt not in self.__var_opts: 853 self.__var_opts.append(opt) 854 macro = self.__bad_macro_chars.sub( r'', opt ) 855 if short: 856 self.add_short_opt(opt,'$(macro' + macro + ')') 857 else: 858 self.add_opt(opt,'$(macro' + macro + ')')
859
860 - def add_var_condor_cmd(self, command):
861 """ 862 Add a condor command to the submit file that allows variable (macro) 863 arguments to be passes to the executable. 864 """ 865 if command not in self.__var_cmds: 866 self.__var_cmds.append(command) 867 macro = self.__bad_macro_chars.sub( r'', command ) 868 self.add_condor_cmd(command, '$(macro' + macro + ')')
869
870 - def add_var_arg(self,arg_index,quote=False):
871 """ 872 Add a command to the submit file to allow variable (macro) arguments 873 to be passed to the executable. 874 """ 875 try: 876 self.__var_args[arg_index] 877 except IndexError: 878 if arg_index != self.__arg_index: 879 raise CondorDAGJobError("mismatch between job and node var_arg index") 880 if quote: 881 self.__var_args.append("'$(macroargument%s)'" % str(arg_index)) 882 else: 883 self.__var_args.append('$(macroargument%s)' % str(arg_index)) 884 self.add_arg(self.__var_args[self.__arg_index]) 885 self.__arg_index += 1
886 887
888 -class CondorDAGManJob:
889 """ 890 Condor DAGMan job class. Appropriate for setting up DAGs to run within a 891 DAG. 892 """
893 - def __init__(self, dag, dir=None, dax=None):
894 """ 895 dag = the name of the condor dag file to run 896 dir = the diretory in which the dag file is located 897 """ 898 self.__dag = dag 899 self.__dax = dax 900 self.__notification = None 901 self.__dag_directory= dir 902 self.__pegasus_exec_dir = None 903 self.__pfn_cache = []
904
905 - def create_node(self):
906 """ 907 Create a condor node from this job. This provides a basic interface to 908 the CondorDAGManNode class. Most jobs in a workflow will subclass the 909 CondorDAGManNode class and overwrite this to give more details when 910 initializing the node. However, this will work fine for jobs with very simp 911 input/output. 912 """ 913 return CondorDAGManNode(self)
914
915 - def set_dag_directory(self, dir):
916 """ 917 Set the directory where the dag will be run 918 @param dir: the name of the directory where the dag will be run 919 """ 920 self.__dag_directory = dir
921
922 - def get_dag_directory(self):
923 """ 924 Get the directory where the dag will be run 925 """ 926 return self.__dag_directory
927
928 - def set_notification(self, value):
929 """ 930 Set the email address to send notification to. 931 @param value: email address or never for no notification. 932 """ 933 self.__notification = value
934
935 - def get_sub_file(self):
936 """ 937 Return the name of the dag as the submit file name for the 938 SUBDAG EXTERNAL command in the uber-dag 939 """ 940 return self.__dag
941
942 - def write_sub_file(self):
943 """ 944 Do nothing as there is not need for a sub file with the 945 SUBDAG EXTERNAL command in the uber-dag 946 """ 947 pass
948
949 - def get_dax(self):
950 """ 951 Return the name of any associated dax file 952 """ 953 return self.__dax
954
955 - def get_dag(self):
956 """ 957 Return the name of any associated dag file 958 """ 959 return self.__dag
960
961 - def set_pegasus_exec_dir(self,dir):
962 """ 963 Set the directory in which pegasus will generate all log files 964 """ 965 self.__pegasus_exec_dir = dir
966
967 - def get_pegasus_exec_dir(self):
968 """ 969 Return the directory in which pegasus will generate all log files 970 """ 971 return self.__pegasus_exec_dir
972
973 - def add_pfn_cache(self,pfn_list):
974 """ 975 Add an lfn pfn and pool tuple to the pfn cache 976 """ 977 self.__pfn_cache += pfn_list
978
979 - def get_pfn_cache(self):
980 """ 981 Return the pfn cache 982 """ 983 return self.__pfn_cache
984 985
986 -class CondorDAGNode:
987 """ 988 A CondorDAGNode represents a node in the DAG. It corresponds to a particular 989 condor job (and so a particular submit file). If the job has variable 990 (macro) options, they can be set here so each nodes executes with the 991 correct options. 992 """
993 - def __init__(self, job):
994 """ 995 @param job: the CondorJob that this node corresponds to. 996 """ 997 if not isinstance(job, CondorDAGJob) and \ 998 not isinstance(job,CondorDAGManJob): 999 raise CondorDAGNodeError( 1000 "A DAG node must correspond to a Condor DAG job or Condor DAGMan job") 1001 self.__name = None 1002 self.__job = job 1003 self.__category = None 1004 self.__priority = None 1005 self.__pre_script = None 1006 self.__pre_script_args = [] 1007 self.__post_script = None 1008 self.__post_script_args = [] 1009 self.__macros = {} 1010 self.__opts = {} 1011 self.__args = [] 1012 self.__arg_index = 0 1013 self.__retry = 0 1014 self.__parents = [] 1015 self.__bad_macro_chars = re.compile(r'[_-]') 1016 self.__output_files = [] 1017 self.__input_files = [] 1018 self.__checkpoint_files = [] 1019 self.__dax_collapse = None 1020 self.__vds_group = None 1021 if isinstance(job,CondorDAGJob) and job.get_universe()=='standard': 1022 self.__grid_start = 'none' 1023 else: 1024 self.__grid_start = None 1025 self.__pegasus_profile = [] 1026 1027 # generate the md5 node name 1028 t = str( int( time.time() * 1000 ) ) 1029 r = str( int( random.random() * 100000000000000000 ) ) 1030 a = str( self.__class__ ) 1031 self.__name = md5(t + r + a).hexdigest() 1032 self.__md5name = self.__name
1033
1034 - def __repr__(self):
1035 return self.__name
1036
1037 - def job(self):
1038 """ 1039 Return the CondorJob that this node is associated with. 1040 """ 1041 return self.__job
1042
1043 - def add_pegasus_profile(self, namespace, key, value):
1044 """ 1045 Add a Pegasus profile to this job which will be written to the dax as 1046 <profile namespace="NAMESPACE" key="KEY">VALUE</profile> 1047 This can be used to add classads to particular jobs in the DAX 1048 @param namespace: A valid Pegasus namespace, e.g. condor. 1049 @param key: The name of the attribute. 1050 @param value: The value of the attribute. 1051 """ 1052 self.__pegasus_profile.append((str(namespace),str(key),str(value)))
1053
1054 - def get_pegasus_profile(self):
1055 """ 1056 Return the pegasus profile dictionary for this node. 1057 """ 1058 return self.__pegasus_profile
1059
1060 - def set_grid_start(self, gridstart):
1061 """ 1062 Set the grid starter that pegasus will use. 4.1 options 1063 are none (the default), kickstart and pegasuslite 1064 @param: gridstart pegasus.gridstart property 1065 """ 1066 self.__grid_start = str(gridstart)
1067
1068 - def get_grid_start(self):
1069 """ 1070 Return the grid starter that pegasus will use. 1071 """ 1072 return self.__grid_start
1073
1074 - def set_pre_script(self,script):
1075 """ 1076 Sets the name of the pre script that is executed before the DAG node is 1077 run. 1078 @param script: path to script 1079 """ 1080 self.__pre_script = script
1081
1082 - def add_pre_script_arg(self,arg):
1083 """ 1084 Adds an argument to the pre script that is executed before the DAG node is 1085 run. 1086 """ 1087 self.__pre_script_args.append(arg)
1088
1089 - def set_post_script(self,script):
1090 """ 1091 Sets the name of the post script that is executed before the DAG node is 1092 run. 1093 @param script: path to script 1094 """ 1095 self.__post_script = script
1096
1097 - def get_post_script(self):
1098 """ 1099 returns the name of the post script that is executed before the DAG node is 1100 run. 1101 @param script: path to script 1102 """ 1103 return self.__post_script
1104
1105 - def add_post_script_arg(self,arg):
1106 """ 1107 Adds an argument to the post script that is executed before the DAG node is 1108 run. 1109 """ 1110 self.__post_script_args.append(arg)
1111
1112 - def get_post_script_arg(self):
1113 """ 1114 Returns and array of arguments to the post script that is executed before 1115 the DAG node is run. 1116 """ 1117 return self.__post_script_args
1118
1119 - def set_name(self,name):
1120 """ 1121 Set the name for this node in the DAG. 1122 """ 1123 self.__name = str(name)
1124
1125 - def get_name(self):
1126 """ 1127 Get the name for this node in the DAG. 1128 """ 1129 return self.__name
1130
1131 - def set_category(self,category):
1132 """ 1133 Set the category for this node in the DAG. 1134 """ 1135 self.__category = str(category)
1136
1137 - def get_category(self):
1138 """ 1139 Get the category for this node in the DAG. 1140 """ 1141 return self.__category
1142
1143 - def set_priority(self,priority):
1144 """ 1145 Set the priority for this node in the DAG. 1146 """ 1147 self.__priority = str(priority)
1148
1149 - def get_priority(self):
1150 """ 1151 Get the priority for this node in the DAG. 1152 """ 1153 return self.__priority
1154
1155 - def add_input_file(self, filename):
1156 """ 1157 Add filename as a necessary input file for this DAG node. 1158 1159 @param filename: input filename to add 1160 """ 1161 if filename not in self.__input_files: 1162 self.__input_files.append(filename) 1163 if not isinstance(self.job(), CondorDAGManJob): 1164 if self.job().get_universe() == 'grid': 1165 self.add_input_macro(filename)
1166
1167 - def add_output_file(self, filename):
1168 """ 1169 Add filename as a output file for this DAG node. 1170 1171 @param filename: output filename to add 1172 """ 1173 if filename not in self.__output_files: 1174 self.__output_files.append(filename) 1175 if not isinstance(self.job(), CondorDAGManJob): 1176 if self.job().get_universe() == 'grid': 1177 self.add_output_macro(filename)
1178
1179 - def add_checkpoint_file(self,filename):
1180 """ 1181 Add filename as a checkpoint file for this DAG node 1182 @param filename: checkpoint filename to add 1183 """ 1184 if filename not in self.__checkpoint_files: 1185 self.__checkpoint_files.append(filename) 1186 if not isinstance(self.job(), CondorDAGManJob): 1187 if self.job().get_universe() == 'grid': 1188 self.add_checkpoint_macro(filename)
1189
1190 - def get_input_files(self):
1191 """ 1192 Return list of input files for this DAG node and its job. 1193 """ 1194 input_files = list(self.__input_files) 1195 if isinstance(self.job(), CondorDAGJob): 1196 input_files = input_files + self.job().get_input_files() 1197 return input_files
1198
1199 - def get_output_files(self):
1200 """ 1201 Return list of output files for this DAG node and its job. 1202 """ 1203 output_files = list(self.__output_files) 1204 if isinstance(self.job(), CondorDAGJob): 1205 output_files = output_files + self.job().get_output_files() 1206 return output_files
1207
1208 - def get_checkpoint_files(self):
1209 """ 1210 Return a list of checkpoint files for this DAG node and its job. 1211 """ 1212 checkpoint_files = list(self.__checkpoint_files) 1213 if isinstance(self.job(), CondorDAGJob): 1214 checkpoint_files = checkpoint_files + self.job().get_checkpoint_files() 1215 return checkpoint_files
1216
1217 - def set_vds_group(self,group):
1218 """ 1219 Set the name of the VDS group key when generating a DAX 1220 @param group: name of group for thus nore 1221 """ 1222 self.__vds_group = str(group)
1223
1224 - def get_vds_group(self):
1225 """ 1226 Returns the VDS group key for this node 1227 """ 1228 return self.__vds_group
1229
1230 - def set_dax_collapse(self,collapse):
1231 """ 1232 Set the DAX collapse key for this node 1233 """ 1234 self.__dax_collapse = str(collapse)
1235
1236 - def get_dax_collapse(self):
1237 """ 1238 Get the DAX collapse key for this node 1239 """ 1240 return self.__dax_collapse
1241
1242 - def add_macro(self,name,value):
1243 """ 1244 Add a variable (macro) for this node. This can be different for 1245 each node in the DAG, even if they use the same CondorJob. Within 1246 the CondorJob, the value of the macro can be referenced as 1247 '$(name)' -- for instance, to define a unique output or error file 1248 for each node. 1249 @param name: macro name. 1250 @param value: value of the macro for this node in the DAG 1251 """ 1252 macro = self.__bad_macro_chars.sub( r'', name ) 1253 self.__opts[macro] = value
1254
1255 - def add_io_macro(self,io,filename):
1256 """ 1257 Add a variable (macro) for storing the input/output files associated 1258 with this node. 1259 @param io: macroinput or macrooutput 1260 @param filename: filename of input/output file 1261 """ 1262 io = self.__bad_macro_chars.sub( r'', io ) 1263 if io not in self.__opts: 1264 self.__opts[io] = filename 1265 else: 1266 if filename not in self.__opts[io]: 1267 self.__opts[io] += ',%s' % filename
1268
1269 - def add_input_macro(self,filename):
1270 """ 1271 Add a variable (macro) for storing the input files associated with 1272 this node. 1273 @param filename: filename of input file 1274 """ 1275 self.add_io_macro('macroinput', filename)
1276
1277 - def add_output_macro(self,filename):
1278 """ 1279 Add a variable (macro) for storing the output files associated with 1280 this node. 1281 @param filename: filename of output file 1282 """ 1283 self.add_io_macro('macrooutput', filename)
1284
1285 - def add_checkpoint_macro(self,filename):
1286 self.add_io_macro('macrocheckpoint',filename)
1287
1288 - def get_opts(self):
1289 """ 1290 Return the opts for this node. Note that this returns only 1291 the options for this instance of the node and not those 1292 associated with the underlying job template. 1293 """ 1294 return self.__opts
1295
1296 - def add_var_condor_cmd(self, command, value):
1297 """ 1298 Add a variable (macro) condor command for this node. If the command 1299 specified does not exist in the CondorJob, it is added so the submit file 1300 will be correct. 1301 PLEASE NOTE: AS with other add_var commands, the variable must be set for 1302 all nodes that use the CondorJob instance. 1303 @param command: command name 1304 @param value: Value of the command for this node in the DAG/DAX. 1305 """ 1306 macro = self.__bad_macro_chars.sub( r'', command ) 1307 self.__macros['macro' + macro] = value 1308 self.__job.add_var_condor_cmd(command)
1309
1310 - def add_var_opt(self,opt,value,short=False):
1311 """ 1312 Add a variable (macro) option for this node. If the option 1313 specified does not exist in the CondorJob, it is added so the submit 1314 file will be correct when written. 1315 @param opt: option name. 1316 @param value: value of the option for this node in the DAG. 1317 """ 1318 macro = self.__bad_macro_chars.sub( r'', opt ) 1319 self.__opts['macro' + macro] = value 1320 self.__job.add_var_opt(opt,short)
1321
1322 - def add_file_opt(self,opt,filename,file_is_output_file=False):
1323 """ 1324 Add a variable (macro) option for this node. If the option 1325 specified does not exist in the CondorJob, it is added so the submit 1326 file will be correct when written. The value of the option is also 1327 added to the list of input files for the DAX. 1328 @param opt: option name. 1329 @param value: value of the option for this node in the DAG. 1330 @param file_is_output_file: A boolean if the file will be an output file 1331 instead of an input file. The default is to have it be an input. 1332 """ 1333 self.add_var_opt(opt,filename) 1334 if file_is_output_file: self.add_output_file(filename) 1335 else: self.add_input_file(filename)
1336
1337 - def add_var_arg(self, arg,quote=False):
1338 """ 1339 Add a variable (or macro) argument to the condor job. The argument is 1340 added to the submit file and a different value of the argument can be set 1341 for each node in the DAG. 1342 @param arg: name of option to add. 1343 """ 1344 self.__args.append(arg) 1345 self.__job.add_var_arg(self.__arg_index,quote=quote) 1346 self.__arg_index += 1
1347
1348 - def add_file_arg(self, filename):
1349 """ 1350 Add a variable (or macro) file name argument to the condor job. The 1351 argument is added to the submit file and a different value of the 1352 argument can be set for each node in the DAG. The file name is also 1353 added to the list of input files for the DAX. 1354 @param filename: name of option to add. 1355 """ 1356 self.add_input_file(filename) 1357 self.add_var_arg(filename)
1358
1359 - def get_args(self):
1360 """ 1361 Return the arguments for this node. Note that this returns 1362 only the arguments for this instance of the node and not those 1363 associated with the underlying job template. 1364 """ 1365 return self.__args
1366
1367 - def set_retry(self, retry):
1368 """ 1369 Set the number of times that this node in the DAG should retry. 1370 @param retry: number of times to retry node. 1371 """ 1372 self.__retry = retry
1373
1374 - def get_retry(self):
1375 """ 1376 Return the number of times that this node in the DAG should retry. 1377 @param retry: number of times to retry node. 1378 """ 1379 return self.__retry
1380
1381 - def write_job(self,fh):
1382 """ 1383 Write the DAG entry for this node's job to the DAG file descriptor. 1384 @param fh: descriptor of open DAG file. 1385 """ 1386 if isinstance(self.job(),CondorDAGManJob): 1387 # create an external subdag from this dag 1388 fh.write( ' '.join( 1389 ['SUBDAG EXTERNAL', self.__name, self.__job.get_sub_file()]) ) 1390 if self.job().get_dag_directory(): 1391 fh.write( ' DIR ' + self.job().get_dag_directory() ) 1392 else: 1393 # write a regular condor job 1394 fh.write( 'JOB ' + self.__name + ' ' + self.__job.get_sub_file() ) 1395 fh.write( '\n') 1396 1397 fh.write( 'RETRY ' + self.__name + ' ' + str(self.__retry) + '\n' )
1398
1399 - def write_category(self,fh):
1400 """ 1401 Write the DAG entry for this node's category to the DAG file descriptor. 1402 @param fh: descriptor of open DAG file. 1403 """ 1404 fh.write( 'CATEGORY ' + self.__name + ' ' + self.__category + '\n' )
1405
1406 - def write_priority(self,fh):
1407 """ 1408 Write the DAG entry for this node's priority to the DAG file descriptor. 1409 @param fh: descriptor of open DAG file. 1410 """ 1411 fh.write( 'PRIORITY ' + self.__name + ' ' + self.__priority + '\n' )
1412
1413 - def write_vars(self,fh):
1414 """ 1415 Write the variable (macro) options and arguments to the DAG file 1416 descriptor. 1417 @param fh: descriptor of open DAG file. 1418 """ 1419 if list(self.__macros.keys()) or list(self.__opts.keys()) or self.__args: 1420 fh.write( 'VARS ' + self.__name ) 1421 for k in self.__macros.keys(): 1422 fh.write( ' ' + str(k) + '="' + str(self.__macros[k]) + '"' ) 1423 for k in self.__opts.keys(): 1424 fh.write( ' ' + str(k) + '="' + str(self.__opts[k]) + '"' ) 1425 if self.__args: 1426 for i in range(self.__arg_index): 1427 fh.write( ' macroargument' + str(i) + '="' + self.__args[i] + '"' ) 1428 fh.write( '\n' )
1429
1430 - def write_parents(self,fh):
1431 """ 1432 Write the parent/child relations for this job to the DAG file descriptor. 1433 @param fh: descriptor of open DAG file. 1434 """ 1435 for parent in self.__parents: 1436 fh.write( 'PARENT ' + str(parent) + ' CHILD ' + str(self) + '\n' )
1437
1438 - def write_pre_script(self,fh):
1439 """ 1440 Write the pre script for the job, if there is one 1441 @param fh: descriptor of open DAG file. 1442 """ 1443 if self.__pre_script: 1444 fh.write( 'SCRIPT PRE ' + str(self) + ' ' + self.__pre_script + ' ' + 1445 ' '.join(self.__pre_script_args) + '\n' )
1446
1447 - def write_post_script(self,fh):
1448 """ 1449 Write the post script for the job, if there is one 1450 @param fh: descriptor of open DAG file. 1451 """ 1452 if self.__post_script: 1453 fh.write( 'SCRIPT POST ' + str(self) + ' ' + self.__post_script + ' ' + 1454 ' '.join(self.__post_script_args) + '\n' )
1455
1456 - def write_input_files(self, fh):
1457 """ 1458 Write as a comment into the DAG file the list of input files 1459 for this DAG node. 1460 1461 @param fh: descriptor of open DAG file. 1462 """ 1463 for f in self.__input_files: 1464 fh.write("## Job %s requires input file %s\n" % (self.__name, f))
1465
1466 - def write_output_files(self, fh):
1467 """ 1468 Write as a comment into the DAG file the list of output files 1469 for this DAG node. 1470 1471 @param fh: descriptor of open DAG file. 1472 """ 1473 for f in self.__output_files: 1474 fh.write("## Job %s generates output file %s\n" % (self.__name, f))
1475
1476 - def set_log_file(self,log):
1477 """ 1478 Set the Condor log file to be used by this CondorJob. 1479 @param log: path of Condor log file. 1480 """ 1481 self.__job.set_log_file(log)
1482
1483 - def add_parent(self,node):
1484 """ 1485 Add a parent to this node. This node will not be executed until the 1486 parent node has run sucessfully. 1487 @param node: CondorDAGNode to add as a parent. 1488 """ 1489 if not isinstance(node, (CondorDAGNode,CondorDAGManNode) ): 1490 raise CondorDAGNodeError("Parent must be a CondorDAGNode or a CondorDAGManNode") 1491 self.__parents.append( node )
1492
1493 - def get_cmd_tuple_list(self):
1494 """ 1495 Return a list of tuples containg the command line arguments 1496 """ 1497 1498 # pattern to find DAGman macros 1499 pat = re.compile(r'\$\((.+)\)') 1500 argpat = re.compile(r'\d+') 1501 1502 # first parse the options and replace macros with values 1503 options = self.job().get_opts() 1504 macros = self.get_opts() 1505 1506 cmd_list = [] 1507 1508 for k in options: 1509 val = options[k] 1510 m = pat.match(val) 1511 if m: 1512 key = m.group(1) 1513 value = macros[key] 1514 1515 cmd_list.append(("--%s" % k, str(value))) 1516 else: 1517 cmd_list.append(("--%s" % k, str(val))) 1518 1519 # second parse the short options and replace macros with values 1520 options = self.job().get_short_opts() 1521 1522 for k in options: 1523 val = options[k] 1524 m = pat.match(val) 1525 if m: 1526 key = m.group(1) 1527 value = macros[key] 1528 1529 cmd_list.append(("-%s" % k, str(value))) 1530 else: 1531 cmd_list.append(("-%s" % k, str(val))) 1532 1533 # lastly parse the arguments and replace macros with values 1534 args = self.job().get_args() 1535 macros = self.get_args() 1536 1537 for a in args: 1538 m = pat.search(a) 1539 if m: 1540 arg_index = int(argpat.findall(a)[0]) 1541 try: 1542 cmd_list.append(("%s" % macros[arg_index], "")) 1543 except IndexError: 1544 cmd_list.append("") 1545 else: 1546 cmd_list.append(("%s" % a, "")) 1547 1548 return cmd_list
1549
1550 - def get_cmd_line(self):
1551 """ 1552 Return the full command line that will be used when this node 1553 is run by DAGman. 1554 """ 1555 1556 cmd = "" 1557 cmd_list = self.get_cmd_tuple_list() 1558 for argument in cmd_list: 1559 cmd += ' '.join(argument) + " " 1560 1561 return cmd
1562
1563 - def finalize(self):
1564 """ 1565 The finalize method of a node is called before the node is 1566 finally added to the DAG and can be overridden to do any last 1567 minute clean up (such as setting extra command line arguments) 1568 """ 1569 pass
1570 1571
1572 -class CondorDAGManNode(CondorDAGNode):
1573 """ 1574 Condor DAGMan node class. Appropriate for setting up DAGs to run within a 1575 DAG. Adds the user-tag functionality to condor_dagman processes running in 1576 the DAG. May also be used to extend dagman-node specific functionality. 1577 """
1578 - def __init__(self, job):
1579 """ 1580 @job: a CondorDAGNodeJob 1581 """ 1582 CondorDAGNode.__init__(self, job) 1583 self.__user_tag = None 1584 self.__maxjobs_categories = [] 1585 self.__cluster_jobs = None 1586 self.__static_pfn_cache = None 1587 self.__reduce_dax = False
1588
1589 - def set_user_tag(self,usertag):
1590 """ 1591 Set the user tag that is passed to the analysis code. 1592 @param user_tag: the user tag to identify the job 1593 """ 1594 self.__user_tag = str(usertag)
1595
1596 - def get_user_tag(self):
1597 """ 1598 Returns the usertag string 1599 """ 1600 return self.__user_tag
1601
1602 - def add_maxjobs_category(self,categoryName,maxJobsNum):
1603 """ 1604 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum. 1605 @param node: Add (categoryName,maxJobsNum) tuple to CondorDAG.__maxjobs_categories. 1606 """ 1607 self.__maxjobs_categories.append((str(categoryName),str(maxJobsNum)))
1608
1609 - def get_maxjobs_categories(self):
1610 """ 1611 Return an array of tuples containing (categoryName,maxJobsNum) 1612 """ 1613 return self.__maxjobs_categories
1614
1615 - def set_cluster_jobs(self,cluster):
1616 """ 1617 Set the type of job clustering pegasus can use to collapse jobs 1618 @param cluster: clustering type 1619 """ 1620 self.__cluster_jobs = str(cluster)
1621
1622 - def get_cluster_jobs(self):
1623 """ 1624 Returns the usertag string 1625 """ 1626 return self.__cluster_jobs
1627
1628 - def set_reduce_dax(self, rd):
1629 """ 1630 Set the flag that tells Pegasus to reduce the DAX based on existing PFNs 1631 @param rd: True or False 1632 """ 1633 self.__reduce_dax = rd
1634
1635 - def get_reduce_dax(self):
1636 """ 1637 Return the flag that tells Pegasus to reduce the DAX based on existing PFNs 1638 """ 1639 return self.__reduce_dax
1640
1641 - def set_static_pfn_cache(self, file):
1642 """ 1643 Use the --cache option to pass a static PFN cache to pegasus-plan 1644 @param cache: full path to the cache file 1645 """ 1646 self.__static_pfn_cache = str(file)
1647
1648 - def get_static_pfn_cache(self):
1649 """ 1650 Return the path to a static PFN cache 1651 """ 1652 return self.__static_pfn_cache
1653 1654
1655 -class CondorDAG:
1656 """ 1657 A CondorDAG is a Condor Directed Acyclic Graph that describes a collection 1658 of Condor jobs and the order in which to run them. All Condor jobs in the 1659 DAG must write their Codor logs to the same file. 1660 NOTE: The log file must not be on an NFS mounted system as the Condor jobs 1661 must be able to get an exclusive file lock on the log file. 1662 """
1663 - def __init__(self,log,dax=False):
1664 """ 1665 @param log: path to log file which must not be on an NFS mounted file system. 1666 @param dax: Set to 1 to create an abstract DAG (a DAX) 1667 """ 1668 self.__log_file_path = log 1669 self.__dax = dax 1670 self.__dag_file_path = None 1671 self.__dax_file_path = None 1672 self.__jobs = [] 1673 self.__nodes = [] 1674 self.__maxjobs_categories = [] 1675 self.__integer_node_names = 0 1676 self.__node_count = 0 1677 self.__nodes_finalized = 0 1678 self.__pegasus_worker = None 1679 self.__pfn_cache=[]
1680
1681 - def get_nodes(self):
1682 """ 1683 Return a list containing all the nodes in the DAG 1684 """ 1685 return self.__nodes
1686
1687 - def get_jobs(self):
1688 """ 1689 Return a list containing all the jobs in the DAG 1690 """ 1691 return self.__jobs
1692
1693 - def is_dax(self):
1694 """ 1695 Returns true if this DAG is really a DAX 1696 """ 1697 return self.__dax
1698
1699 - def set_integer_node_names(self):
1700 """ 1701 Use integer node names for the DAG 1702 """ 1703 self.__integer_node_names = 1
1704
1705 - def set_dag_file(self, path):
1706 """ 1707 Set the name of the file into which the DAG is written. 1708 @param path: path to DAG file. 1709 """ 1710 self.__dag_file_path = path + '.dag'
1711
1712 - def get_dag_file(self):
1713 """ 1714 Return the path to the DAG file. 1715 """ 1716 if not self.__log_file_path: 1717 raise CondorDAGError("No path for DAG file") 1718 else: 1719 return self.__dag_file_path
1720
1721 - def set_dax_file(self, path):
1722 """ 1723 Set the name of the file into which the DAG is written. 1724 @param path: path to DAG file. 1725 """ 1726 self.__dax_file_path = path + '.dax'
1727
1728 - def get_dax_file(self):
1729 """ 1730 Return the path to the DAG file. 1731 """ 1732 if not self.__log_file_path: 1733 raise CondorDAGError("No path for DAX file") 1734 else: 1735 return self.__dax_file_path
1736
1737 - def add_node(self,node):
1738 """ 1739 Add a CondorDAGNode to this DAG. The CondorJob that the node uses is 1740 also added to the list of Condor jobs in the DAG so that a list of the 1741 submit files needed by the DAG can be maintained. Each unique CondorJob 1742 will be added once to prevent duplicate submit files being written. 1743 @param node: CondorDAGNode to add to the CondorDAG. 1744 """ 1745 if not isinstance(node, CondorDAGNode): 1746 raise CondorDAGError("Nodes must be class CondorDAGNode or subclass") 1747 if not isinstance(node.job(), CondorDAGManJob): 1748 node.set_log_file(self.__log_file_path) 1749 self.__nodes.append(node) 1750 if self.__integer_node_names: 1751 node.set_name(str(self.__node_count)) 1752 self.__node_count += 1 1753 if node.job() not in self.__jobs: 1754 self.__jobs.append(node.job())
1755
1756 - def add_maxjobs_category(self,categoryName,maxJobsNum):
1757 """ 1758 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum. 1759 @param node: Add (categoryName,maxJobsNum) tuple to CondorDAG.__maxjobs_categories. 1760 """ 1761 self.__maxjobs_categories.append((str(categoryName),str(maxJobsNum)))
1762
1763 - def get_maxjobs_categories(self):
1764 """ 1765 Return an array of tuples containing (categoryName,maxJobsNum) 1766 """ 1767 return self.__maxjobs_categories
1768
1769 - def set_pegasus_worker(self, path):
1770 """ 1771 Set the path of a pagsus worker package to use for the workflow. 1772 @param path: path to worker package. 1773 """ 1774 self.__pegasus_worker = path
1775
1776 - def get_pegasus_worker(self):
1777 """ 1778 Return the path to the pegasus worker package. 1779 """ 1780 return self.__pegasus_worker
1781
1782 - def write_maxjobs(self,fh,category):
1783 """ 1784 Write the DAG entry for this category's maxjobs to the DAG file descriptor. 1785 @param fh: descriptor of open DAG file. 1786 @param category: tuple containing type of jobs to set a maxjobs limit for 1787 and the maximum number of jobs of that type to run at once. 1788 """ 1789 fh.write( 'MAXJOBS ' + str(category[0]) + ' ' + str(category[1]) + '\n' )
1790
1791 - def write_sub_files(self):
1792 """ 1793 Write all the submit files used by the dag to disk. Each submit file is 1794 written to the file name set in the CondorJob. 1795 """ 1796 if not self.__nodes_finalized: 1797 for node in self.__nodes: 1798 node.finalize() 1799 if not self.is_dax(): 1800 for job in self.__jobs: 1801 job.write_sub_file()
1802
1803 - def add_pfn_cache(self,pfn_list):
1804 """ 1805 Add an lfn pfn and pool tuple to the pfn cache 1806 Note: input looks like ('/path/to/file','file:///path/to/file','local') 1807 """ 1808 self.__pfn_cache += pfn_list
1809
1810 - def get_pfn_cache(self):
1811 """ 1812 Return the pfn cache 1813 """ 1814 return self.__pfn_cache
1815
1816 - def write_concrete_dag(self):
1817 """ 1818 Write all the nodes in the DAG to the DAG file. 1819 """ 1820 if not self.__dag_file_path: 1821 raise CondorDAGError("No path for DAG file") 1822 try: 1823 dagfile = open( self.__dag_file_path, 'w' ) 1824 except: 1825 raise CondorDAGError("Cannot open file " + self.__dag_file_path) 1826 for node in self.__nodes: 1827 node.write_job(dagfile) 1828 node.write_vars(dagfile) 1829 if node.get_category(): 1830 node.write_category(dagfile) 1831 if node.get_priority(): 1832 node.write_priority(dagfile) 1833 node.write_pre_script(dagfile) 1834 node.write_post_script(dagfile) 1835 node.write_input_files(dagfile) 1836 node.write_output_files(dagfile) 1837 for node in self.__nodes: 1838 node.write_parents(dagfile) 1839 for category in self.__maxjobs_categories: 1840 self.write_maxjobs(dagfile, category) 1841 dagfile.close()
1842
1843 - def write_abstract_dag(self):
1844 """ 1845 Write all the nodes in the workflow to the DAX file. 1846 """ 1847 1848 # keep track of if we are using stampede at TACC 1849 using_stampede = False 1850 1851 if not self.__dax_file_path: 1852 # this workflow is not dax-compatible, so don't write a dax 1853 return 1854 1855 import Pegasus.DAX3 1856 # create the workflow object 1857 dax_name = os.path.split(self.__dax_file_path)[-1] 1858 dax_basename = '.'.join(dax_name.split('.')[0:-1]) 1859 workflow = Pegasus.DAX3.ADAG( dax_basename ) 1860 1861 # we save the ID number to DAG node name mapping so that 1862 # we can easily write out the child/parent relationship 1863 # later 1864 node_job_object_dict = {} 1865 1866 # FIXME disctionary of executables and pfns in the workflow 1867 # Pegasus should take care of this so we don't have to 1868 workflow_executable_dict = {} 1869 workflow_pfn_dict = {} 1870 1871 # Add PFN caches for this workflow 1872 for pfn_tuple in self.get_pfn_cache(): 1873 workflow_pfn_dict[pfn_tuple[0]] = pfn_tuple 1874 1875 if self.get_pegasus_worker(): 1876 # write the executable into the dax 1877 worker_package = Pegasus.DAX3.Executable( 1878 namespace="pegasus", name="worker", 1879 os="linux", arch="x86_64", installed=False) 1880 worker_package.addPFN(Pegasus.DAX3.PFN(self.get_pegasus_worker(),"local")) 1881 workflow_executable_dict['pegasus-pegasus_worker'] = worker_package 1882 1883 # check for the pegasus-cluster package 1884 for path in os.environ["PATH"].split(":"): 1885 cluster_path = os.path.join(path,"pegasus-cluster") 1886 if os.path.exists(cluster_path): 1887 # and add to the dax if it exists 1888 seqexec_package = Pegasus.DAX3.Executable( 1889 namespace="pegasus", name="seqexec", 1890 os="linux", arch="x86_64", installed=True) 1891 seqexec_package.addPFN(Pegasus.DAX3.PFN(cluster_path,"local")) 1892 workflow_executable_dict['pegasus-pegasus_seqexec'] = seqexec_package 1893 1894 id = 0 1895 for node in self.__nodes: 1896 if self.is_dax() and isinstance(node, LSCDataFindNode): 1897 pass 1898 1899 elif isinstance(node.job(), CondorDAGManJob): 1900 id += 1 1901 id_tag = "ID%06d" % id 1902 node_name = node._CondorDAGNode__name 1903 1904 if node.job().get_dax() is None: 1905 # write this node as a sub-dag 1906 subdag_name = os.path.split(node.job().get_dag())[-1] 1907 try: 1908 subdag_exec_path = os.path.join( 1909 os.getcwd(),node.job().get_dag_directory()) 1910 except AttributeError: 1911 subdag_exec_path = os.getcwd() 1912 1913 subdag = Pegasus.DAX3.DAG(subdag_name,id=id_tag) 1914 subdag.addProfile(Pegasus.DAX3.Profile("dagman","DIR",subdag_exec_path)) 1915 1916 subdag_file = Pegasus.DAX3.File(subdag_name) 1917 subdag_file.addPFN(Pegasus.DAX3.PFN(os.path.join(subdag_exec_path,subdag_name),"local")) 1918 workflow.addFile(subdag_file) 1919 workflow.addDAG(subdag) 1920 node_job_object_dict[node_name] = subdag 1921 1922 else: 1923 # write this node as a sub-dax 1924 subdax_name = os.path.split(node.job().get_dax())[-1] 1925 dax_subdir = node.job().get_dag_directory() 1926 if dax_subdir: 1927 subdax_path = os.path.join( 1928 os.getcwd(),node.job().get_dag_directory(),subdax_name) 1929 else: 1930 subdax_path = os.path.join(os.getcwd(),subdax_name) 1931 dax_subdir = '.' 1932 1933 subdax = Pegasus.DAX3.DAX(subdax_name,id=id_tag) 1934 1935 # FIXME pegasus should ensure these are unique 1936 for pfn_tuple in node.job().get_pfn_cache(): 1937 workflow_pfn_dict[pfn_tuple[0]] = pfn_tuple 1938 1939 # set the storage, execute, and output directory locations 1940 pegasus_args = """--dir %s """ % dax_subdir 1941 pegasus_args += """--output-dir %s """ % dax_subdir 1942 1943 # set the maxjobs categories for the subdax 1944 # FIXME pegasus should expose this in the dax, so it can 1945 # be handled like the MAXJOBS keyword in dag files 1946 for maxjobcat in node.get_maxjobs_categories(): 1947 pegasus_args += "-Dpegasus.dagman." + maxjobcat[0] + ".maxjobs=" + maxjobcat[1] + " " 1948 1949 if not self.is_dax(): 1950 pegasus_args += "--nocleanup " 1951 1952 if node.get_cluster_jobs(): 1953 pegasus_args += "--cluster " + node.get_cluster_jobs() + " " 1954 1955 if node.get_reduce_dax() is False: 1956 pegasus_args += " --force " 1957 1958 if node.get_static_pfn_cache(): 1959 pegasus_args += " --cache " + node.get_static_pfn_cache() + " " 1960 1961 pegasus_args += "--output-site local -vvvvvv" 1962 subdax.addArguments(pegasus_args) 1963 1964 subdax_file = Pegasus.DAX3.File(subdax_name) 1965 subdax_file.addPFN(Pegasus.DAX3.PFN(subdax_path,"local")) 1966 workflow.addFile(subdax_file) 1967 workflow.addDAX(subdax) 1968 node_job_object_dict[node_name] = subdax 1969 1970 else: 1971 # write this job as a regular node 1972 executable = node.job()._CondorJob__executable 1973 node_name = node._CondorDAGNode__name 1974 1975 id += 1 1976 id_tag = "ID%06d" % id 1977 node_job_object_dict[node_name] = id_tag 1978 1979 # get the name of the executable 1980 executable_namespace = 'ligo-' + str(node.job().__class__.__name__).lower() 1981 executable_base = os.path.basename(executable) 1982 1983 workflow_job = Pegasus.DAX3.Job( namespace=executable_namespace, 1984 name=executable_base, version="1.0", id=id_tag) 1985 1986 cmd_line = node.get_cmd_tuple_list() 1987 1988 # loop through all filenames looking for them in the command 1989 # line so that they can be replaced appropriately by xml tags 1990 input_node_file_dict = {} 1991 for f in node.get_input_files(): 1992 input_node_file_dict[f] = 1 1993 1994 for f in input_node_file_dict.keys(): 1995 workflow_job.uses(Pegasus.DAX3.File(os.path.basename(f)),link=Pegasus.DAX3.Link.INPUT,register=False,transfer=True) 1996 1997 output_node_file_dict = {} 1998 for f in node.get_output_files(): 1999 output_node_file_dict[f] = 1 2000 2001 checkpoint_node_file_dict = {} 2002 for f in node.get_checkpoint_files(): 2003 checkpoint_node_file_dict[f] = 1 2004 2005 for f in output_node_file_dict.keys(): 2006 workflow_job.uses(Pegasus.DAX3.File(os.path.basename(f)),link=Pegasus.DAX3.Link.OUTPUT,register=False,transfer=True) 2007 2008 for f in checkpoint_node_file_dict.keys(): 2009 workflow_job.uses(Pegasus.DAX3.File(os.path.basename(f)),link=Pegasus.DAX3.Link.CHECKPOINT,register=False,transfer=True) 2010 2011 node_file_dict = dict( list(input_node_file_dict.items()) + list(output_node_file_dict.items()) + list(checkpoint_node_file_dict.items()) ) 2012 2013 for job_arg in cmd_line: 2014 try: 2015 if job_arg[0] in node_file_dict: 2016 workflow_job.addArguments(Pegasus.DAX3.File(os.path.basename(job_arg[0]))) 2017 elif job_arg[1] in node_file_dict: 2018 workflow_job.addArguments(job_arg[0], Pegasus.DAX3.File(os.path.basename(job_arg[1]))) 2019 elif len(job_arg[1].split(' ')) != 1: 2020 args = [job_arg[0]] 2021 for arg in job_arg[1].split(' '): 2022 if arg in node_file_dict: 2023 args.append(Pegasus.DAX3.File(os.path.basename(arg))) 2024 else: 2025 args.append(arg) 2026 workflow_job.addArguments(*args) 2027 else: 2028 workflow_job.addArguments(job_arg[0], job_arg[1]) 2029 except IndexError: 2030 pass 2031 2032 # Check for desired grid site 2033 if node.job().get_grid_site(): 2034 this_grid_site = node.job().get_grid_site() 2035 workflow_job.addProfile(Pegasus.DAX3.Profile('hints','execution.site',this_grid_site)) 2036 if this_grid_site == 'stampede-dev' or this_grid_site=='stampede': 2037 using_stampede = True 2038 2039 # write the executable into the dax 2040 job_executable = Pegasus.DAX3.Executable( 2041 namespace=executable_namespace, 2042 name=executable_base, version="1.0", 2043 os="linux", arch="x86_64", 2044 installed=node.job().get_executable_installed()) 2045 2046 executable_path = os.path.join(os.getcwd(),executable) 2047 job_executable.addPFN(Pegasus.DAX3.PFN(executable_path,"local")) 2048 2049 workflow_executable_dict[executable_namespace + executable_base] = job_executable 2050 2051 # write the mpi cluster parameter for the job 2052 if node.job().get_dax_mpi_cluster(): 2053 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","job.aggregator","mpiexec")) 2054 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","clusters.size",str(node.job().get_dax_mpi_cluster()))) 2055 2056 # write the grid start parameter for this node 2057 # if the grid start is not None 2058 if node.get_grid_start(): 2059 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","gridstart",node.get_grid_start())) 2060 2061 # write the bundle parameter if this node has one 2062 if node.get_dax_collapse(): 2063 workflow_job.addProfile(Pegasus.DAX3.Profile("pegasus","clusters.size",str(node.get_dax_collapse()))) 2064 2065 # write number of times the node should be retried 2066 if node.get_retry(): 2067 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","retry",str(node.get_retry()))) 2068 2069 # write the post script for this node 2070 if node.get_post_script(): 2071 post_script_base = os.path.basename(node.get_post_script()) 2072 post_script_path = os.path.join(os.getcwd(),node.get_post_script()) 2073 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","post",post_script_base)) 2074 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","post.path." + post_script_base,post_script_path)) 2075 2076 # write the post script for this node 2077 if node.get_post_script_arg(): 2078 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","post.arguments",' '.join(node.get_post_script_arg()))) 2079 2080 # write the dag node category if this node has one 2081 if node.get_category(): 2082 workflow_job.addProfile(Pegasus.DAX3.Profile("dagman","category",str(node.get_category()))) 2083 2084 # write the dag node priority if this node has one 2085 if node.get_priority(): 2086 workflow_job.addProfile(Pegasus.DAX3.Profile("condor","priority",str(node.get_priority()))) 2087 2088 # write the universe that this job should run in to the dax 2089 if node.get_dax_collapse(): 2090 # collapsed jobs must run in the vanilla universe 2091 workflow_job.addProfile(Pegasus.DAX3.Profile("condor","universe","vanilla")) 2092 else: 2093 workflow_job.addProfile(Pegasus.DAX3.Profile("condor","universe",node.job().get_universe())) 2094 2095 # Add condor commands to sub files 2096 for ccmd_key, ccmd_val in node.job().get_condor_cmds().items(): 2097 workflow_job.addProfile(Pegasus.DAX3.Profile("condor", ccmd_key, ccmd_val)) 2098 2099 # Add stdout and stderr 2100 workflow_job.setStdout(node.job().get_stdout_file()) 2101 workflow_job.setStderr(node.job().get_stderr_file()) 2102 2103 # add any other user specified condor commands or classads 2104 for p in node.get_pegasus_profile(): 2105 workflow_job.addProfile(Pegasus.DAX3.Profile(p[0],p[1],p[2])) 2106 2107 # finally add this job to the workflow 2108 workflow.addJob(workflow_job) 2109 node_job_object_dict[node_name] = workflow_job 2110 2111 2112 # print parent-child relationships to DAX 2113 for node in self.__nodes: 2114 if self.is_dax() and isinstance(node, LSCDataFindNode): 2115 pass 2116 elif self.is_dax() and ( len(node._CondorDAGNode__parents) == 1 ) and isinstance(node._CondorDAGNode__parents[0], LSCDataFindNode): 2117 pass 2118 else: 2119 child_job_object = node_job_object_dict[str(node)] 2120 if node._CondorDAGNode__parents: 2121 for parent in node._CondorDAGNode__parents: 2122 if self.is_dax() and isinstance(parent, LSCDataFindNode): 2123 pass 2124 else: 2125 parent_job_object = node_job_object_dict[str(parent)] 2126 workflow.addDependency(Pegasus.DAX3.Dependency(parent=parent_job_object, child=child_job_object)) 2127 2128 # FIXME put all the executables in the workflow 2129 for exec_key in workflow_executable_dict.keys(): 2130 workflow.addExecutable(workflow_executable_dict[exec_key]) 2131 2132 # FIXME if we are running on stampede, add the mpi wrapper job 2133 if using_stampede: 2134 prod_mpiexec = Pegasus.DAX3.Executable(namespace="pegasus", 2135 name="mpiexec", os="linux", arch="x86_64", installed="true") 2136 prod_mpiexec.addPFN(Pegasus.DAX3.PFN("file:///home1/02796/dabrown/bin/mpi-cluster-wrapper-impi.sh","stampede")) 2137 workflow.addExecutable(prod_mpiexec) 2138 2139 dev_mpiexec = Pegasus.DAX3.Executable(namespace="pegasus", 2140 name="mpiexec", os="linux", arch="x86_64", installed="true") 2141 dev_mpiexec.addPFN(Pegasus.DAX3.PFN("file:///home1/02796/dabrown/bin/mpi-cluster-wrapper-impi.sh","stampede-dev")) 2142 workflow.addExecutable(dev_mpiexec) 2143 2144 # FIXME put all the pfns in the workflow 2145 for pfn_key in workflow_pfn_dict.keys(): 2146 f = Pegasus.DAX3.File(workflow_pfn_dict[pfn_key][0]) 2147 f.addPFN(Pegasus.DAX3.PFN(workflow_pfn_dict[pfn_key][1],workflow_pfn_dict[pfn_key][2])) 2148 workflow.addFile(f) 2149 2150 f = open(self.__dax_file_path,"w") 2151 workflow.writeXML(f) 2152 f.close()
2153
2154 - def write_dag(self):
2155 """ 2156 Write either a dag or a dax. 2157 """ 2158 if not self.__nodes_finalized: 2159 for node in self.__nodes: 2160 node.finalize() 2161 self.write_concrete_dag() 2162 self.write_abstract_dag()
2163
2164 - def write_script(self):
2165 """ 2166 Write the workflow to a script (.sh instead of .dag). 2167 2168 Assuming that parents were added to the DAG before their children, 2169 dependencies should be handled correctly. 2170 """ 2171 if not self.__dag_file_path: 2172 raise CondorDAGError("No path for DAG file") 2173 try: 2174 dfp = self.__dag_file_path 2175 outfilename = ".".join(dfp.split(".")[:-1]) + ".sh" 2176 outfile = open(outfilename, "w") 2177 except: 2178 raise CondorDAGError("Cannot open file " + self.__dag_file_path) 2179 2180 for node in self.__nodes: 2181 outfile.write("# Job %s\n" % node.get_name()) 2182 # Check if this is a DAGMAN Node 2183 if isinstance(node,CondorDAGManNode): 2184 outfile.write("condor_submit_dag %s\n\n" % (node.job().get_dag())) 2185 else: 2186 outfile.write("%s %s\n\n" % (node.job().get_executable(), 2187 node.get_cmd_line())) 2188 outfile.close() 2189 2190 os.chmod(outfilename, os.stat(outfilename)[0] | stat.S_IEXEC)
2191
2192 - def prepare_dax(self,grid_site=None,tmp_exec_dir='.',peg_frame_cache=None):
2193 """ 2194 Sets up a pegasus script for the given dag 2195 """ 2196 dag=self 2197 log_path=self.__log_file_path 2198 2199 # this function creates the following three files needed by pegasus 2200 peg_fh = open("pegasus_submit_dax", "w") 2201 pegprop_fh = open("pegasus.properties", "w") 2202 sitefile = open( 'sites.xml', 'w' ) 2203 2204 # write the default properties 2205 pegprop_fh.write(PEGASUS_PROPERTIES % (os.getcwd())) 2206 2207 # set up site and dir options for pegasus-submit-dax 2208 dirs_entry='--relative-dir .' 2209 if grid_site: 2210 exec_site=grid_site 2211 exec_ssite_list = exec_site.split(',') 2212 for site in exec_ssite_list: 2213 # if virgo sites are being used, then we don't have a shared fs 2214 if site == 'nikhef': 2215 dirs_entry += ' --staging-site nikhef=nikhef-srm' 2216 else: 2217 dirs_entry += ' --staging-site %s=%s' % (site,site) 2218 if site == 'nikhef' or site == 'bologna': 2219 pegprop_fh.write( 2220 """ 2221 ############################################################################### 2222 # Data Staging Configuration 2223 2224 # Pegasus will be setup to execute jobs on an execution site without relying 2225 # on a shared filesystem between the head node and the worker nodes. If this 2226 # is set, specify staging site ( using --staging-site option to pegasus-plan) 2227 # to indicate the site to use as a central storage location for a workflow. 2228 pegasus.data.configuration=nonsharedfs 2229 2230 2231 """) 2232 else: 2233 exec_site='local' 2234 2235 # write the pegasus_submit_dax and make it executable 2236 peg_fh.write(PEGASUS_SCRIPT % ( tmp_exec_dir, os.getcwd(), 2237 dag.get_dax_file().replace('.dax','') + '-0.dag', 2238 dag.get_dax_file(), dirs_entry, exec_site )) 2239 peg_fh.close() 2240 os.chmod("pegasus_submit_dax",0o755) 2241 2242 # if a frame cache has been specified, write it to the properties 2243 # however note that this is overridden by the --cache option to pegasus 2244 if peg_frame_cache: 2245 pegprop_fh.write("pegasus.catalog.replica.file=%s\n" % (os.path.join(os.getcwd(),os.path.basename(peg_frame_cache)))) 2246 pegprop_fh.close() 2247 2248 # write a shell script that can return the basedir and uber-concrete-dag 2249 basedir_fh = open("pegasus_basedir", "w") 2250 basedir_fh.write(PEGASUS_BASEDIR_SCRIPT % ( tmp_exec_dir, dag.get_dax_file().replace('.dax','') + '-0.dag' )) 2251 basedir_fh.close() 2252 os.chmod("pegasus_basedir",0o755) 2253 2254 # write the site catalog file which is needed by pegasus 2255 pwd = os.getcwd() 2256 try: 2257 hostname = socket.gethostbyaddr(socket.gethostname())[0] 2258 except: 2259 hostname = 'localhost' 2260 2261 sitefile.write(""" 2262 <?xml version="1.0" encoding="UTF-8"?> 2263 <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2264 xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0"> 2265 <site handle="local" arch="x86_64" os="LINUX"> 2266 <grid type="gt2" contact="%s/jobmanager-fork" scheduler="Fork" jobtype="auxillary" total-nodes="50"/> 2267 <grid type="gt2" contact="%s/jobmanager-condor" scheduler="Condor" jobtype="compute" total-nodes="50"/> 2268 <directory path="%s" type="shared-scratch" free-size="null" total-size="null"> 2269 <file-server operation="all" url="file://%s"> 2270 </file-server> 2271 </directory> 2272 <directory path="%s" type="shared-storage" free-size="null" total-size="null"> 2273 <file-server operation="all" url="file://%s"> 2274 </file-server> 2275 </directory> 2276 <replica-catalog type="LRC" url="rlsn://smarty.isi.edu"> 2277 </replica-catalog> 2278 """ % (hostname,hostname,pwd,pwd,pwd,pwd)) 2279 2280 try: 2281 sitefile.write(""" <profile namespace="env" key="GLOBUS_LOCATION">%s</profile>\n""" % os.environ['GLOBUS_LOCATION']) 2282 except: 2283 pass 2284 try: 2285 sitefile.write(""" <profile namespace="env" key="LD_LIBRARY_PATH">%s</profile>\n""" % os.environ['LD_LIBRARY_PATH']) 2286 except: 2287 pass 2288 try: 2289 sitefile.write(""" <profile namespace="env" key="PYTHONPATH">%s</profile>\n""" % os.environ['PYTHONPATH']) 2290 except: 2291 pass 2292 try: 2293 sitefile.write(""" <profile namespace="env" key="PEGASUS_HOME">%s</profile>\n""" % os.environ['PEGASUS_HOME']) 2294 except: 2295 pass 2296 try: 2297 sitefile.write(""" <profile namespace="env" key="LIGO_DATAFIND_SERVER">%s</profile>\n""" % os.environ['LIGO_DATAFIND_SERVER']) 2298 except: 2299 pass 2300 try: 2301 sitefile.write(""" <profile namespace="env" key="S6_SEGMENT_SERVER">%s</profile>\n""" % os.environ['S6_SEGMENT_SERVER']) 2302 except: 2303 pass 2304 2305 sitefile.write("""\ 2306 <profile namespace="env" key="JAVA_HEAPMAX">4096</profile> 2307 <profile namespace="pegasus" key="style">condor</profile> 2308 <profile namespace="condor" key="getenv">True</profile> 2309 <profile namespace="condor" key="should_transfer_files">YES</profile> 2310 <profile namespace="condor" key="when_to_transfer_output">ON_EXIT_OR_EVICT</profile> 2311 </site> 2312 """) 2313 2314 sitefile.write("""\ 2315 <!-- Bologna cluster --> 2316 <site handle="bologna" arch="x86_64" os="LINUX"> 2317 <grid type="cream" contact="https://ce01-lcg.cr.cnaf.infn.it:8443/ce-cream/services/CREAM2" scheduler="LSF" jobtype="compute" /> 2318 <grid type="cream" contact="https://ce01-lcg.cr.cnaf.infn.it:8443/ce-cream/services/CREAM2" scheduler="LSF" jobtype="auxillary" /> 2319 <directory type="shared-scratch" path="/storage/gpfs_virgo4/virgo4/%s/"> 2320 <file-server operation="all" url="srm://storm-fe-archive.cr.cnaf.infn.it:8444/srm/managerv2?SFN=/virgo4/%s/"/> 2321 </directory> 2322 <profile namespace="pegasus" key="style">cream</profile> 2323 <profile namespace="globus" key="queue">virgo</profile> 2324 </site> 2325 """ % (os.path.basename(tmp_exec_dir),os.path.basename(tmp_exec_dir))) 2326 2327 sitefile.write("""\ 2328 <!-- Nikhef Big Grid --> 2329 <site handle="nikhef" arch="x86_64" os="LINUX"> 2330 <grid type="cream" contact="https://klomp.nikhef.nl:8443/ce-cream/services/CREAM2" scheduler="PBS" jobtype="compute" /> 2331 <grid type="cream" contact="https://klomp.nikhef.nl:8443/ce-cream/services/CREAM2" scheduler="PBS" jobtype="auxillary" /> 2332 <profile namespace="pegasus" key="style">cream</profile> 2333 <profile namespace="globus" key="queue">medium</profile> 2334 </site> 2335 <!-- Nikhef Stage in Site --> 2336 <site handle="nikhef-srm" arch="x86_64" os="LINUX"> 2337 <directory type="shared-scratch" path="/%s/"> 2338 <file-server operation="all" url="srm://tbn18.nikhef.nl:8446/srm/managerv2?SFN=/dpm/nikhef.nl/home/virgo/%s/" /> 2339 </directory> 2340 </site> 2341 """ % (os.path.basename(tmp_exec_dir),os.path.basename(tmp_exec_dir))) 2342 2343 try: 2344 stampede_home = subprocess.check_output( 2345 ['gsissh','-o','BatchMode=yes','-p','2222','stampede.tacc.xsede.org','pwd']) 2346 stampede_home = stampede_home.split('/') 2347 stampede_magic_number = stampede_home[2] 2348 stampede_username = stampede_home[3] 2349 shared_scratch = "/work/%s/%s/ihope-workflow/%s" % ( 2350 stampede_magic_number,stampede_username,os.path.basename(tmp_exec_dir)) 2351 2352 sitefile.write("""\ 2353 <!-- XSEDE Stampede Cluster at TACC Development Queue --> 2354 <site handle="stampede-dev" arch="x86_64" os="LINUX"> 2355 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-fork" scheduler="Fork" jobtype="auxillary"/> 2356 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-slurm" scheduler="unknown" jobtype="compute"/> 2357 <directory type="shared-scratch" path="%s"> 2358 <file-server operation="all" url="gsiftp://gridftp.stampede.tacc.xsede.org%s"/> 2359 </directory> 2360 <profile namespace="env" key="PEGASUS_HOME">/usr</profile> 2361 <profile namespace="globus" key="queue">development</profile> 2362 <profile namespace="globus" key="maxwalltime">180</profile> 2363 <profile namespace="globus" key="host_count">1</profile> 2364 <profile namespace="globus" key="count">16</profile> 2365 <profile namespace="globus" key="jobtype">single</profile> 2366 <profile namespace="globus" key="project">TG-PHY140012</profile> 2367 </site> 2368 """ % (shared_scratch,shared_scratch)) 2369 2370 sitefile.write("""\ 2371 <!-- XSEDE Stampede Cluster at TACC Development Queue --> 2372 <site handle="stampede" arch="x86_64" os="LINUX"> 2373 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-fork" scheduler="Fork" jobtype="auxillary"/> 2374 <grid type="gt5" contact="login5.stampede.tacc.utexas.edu/jobmanager-slurm" scheduler="unknown" jobtype="compute"/> 2375 <directory type="shared-scratch" path="%s"> 2376 <file-server operation="all" url="gsiftp://gridftp.stampede.tacc.xsede.org%s"/> 2377 </directory> 2378 <profile namespace="env" key="PEGASUS_HOME">/usr</profile> 2379 <profile namespace="globus" key="queue">development</profile> 2380 <profile namespace="globus" key="maxwalltime">540</profile> 2381 <profile namespace="globus" key="host_count">32</profile> 2382 <profile namespace="globus" key="count">512</profile> 2383 <profile namespace="globus" key="jobtype">single</profile> 2384 <profile namespace="globus" key="project">TG-PHY140012</profile> 2385 </site> 2386 """ % (shared_scratch,shared_scratch)) 2387 2388 except: 2389 sitefile.write("""\ 2390 <!-- XSEDE Stampede Cluster disabled as gsissh to TACC failed--> 2391 """) 2392 2393 sitefile.write("""\ 2394 </sitecatalog>""") 2395 sitefile.close() 2396 2397 # Write a help message telling the user how to run the workflow 2398 print() 2399 print("Created a workflow file which can be submitted by executing") 2400 print(""" 2401 2402 ./pegasus_submit_dax 2403 2404 in the analysis directory on a condor submit machine. 2405 2406 From the analysis directory on the condor submit machine, you can run the 2407 command 2408 2409 pegasus-status --long -t -i `./pegasus_basedir` 2410 2411 to check the status of your workflow. Once the workflow has finished you 2412 can run the command 2413 2414 pegasus-analyzer -t -i `./pegasus_basedir` 2415 2416 to debug any failed jobs. 2417 """)
2418 2419
2420 -class AnalysisJob:
2421 """ 2422 Describes a generic analysis job that filters LIGO data as configured by 2423 an ini file. 2424 """
2425 - def __init__(self,cp,dax=False):
2426 """ 2427 @param cp: ConfigParser object that contains the configuration for this job. 2428 """ 2429 self.__cp = cp 2430 self.__dax = dax 2431 try: 2432 self.__channel = string.strip(self.__cp.get('input','channel')) 2433 except: 2434 self.__channel = None
2435
2436 - def is_dax(self):
2437 """ 2438 Returns true if this job should behave as a DAX 2439 """ 2440 return self.__dax
2441
2442 - def get_config(self,sec,opt):
2443 """ 2444 Get the configration variable in a particular section of this jobs ini 2445 file. 2446 @param sec: ini file section. 2447 @param opt: option from section sec. 2448 """ 2449 return string.strip(self.__cp.get(sec,opt))
2450
2451 - def set_channel(self,channel):
2452 """ 2453 Set the name of the channel that this job is filtering. This will 2454 overwrite the value obtained at initialization. 2455 """ 2456 self.__channel = channel
2457
2458 - def channel(self):
2459 """ 2460 Returns the name of the channel that this job is filtering. Note that 2461 channel is defined to be IFO independent, so this may be LSC-AS_Q or 2462 IOO-MC_F. The IFO is set on a per node basis, not a per job basis. 2463 """ 2464 return self.__channel
2465 2466
2467 -class AnalysisNode(CondorDAGNode):
2468 """ 2469 Contains the methods that allow an object to be built to analyse LIGO 2470 data in a Condor DAG. 2471 """
2472 - def __init__(self):
2473 self.__start = 0 2474 self.__end = 0 2475 self.__data_start = 0 2476 self.__pad_data = 0 2477 self.__data_end = 0 2478 self.__trig_start = 0 2479 self.__trig_end = 0 2480 self.__ifo = None 2481 self.__ifo_tag = None 2482 self.__input = None 2483 self.__output = None 2484 self.__calibration = None 2485 self.__calibration_cache = None 2486 self.__LHO2k = re.compile(r'H2') 2487 self.__user_tag = self.job().get_opts().get("user-tag", None)
2488
2489 - def set_start(self,time,pass_to_command_line=True):
2490 """ 2491 Set the GPS start time of the analysis node by setting a --gps-start-time 2492 option to the node when it is executed. 2493 @param time: GPS start time of job. 2494 @bool pass_to_command_line: add gps-start-time as variable option. 2495 """ 2496 if pass_to_command_line: 2497 self.add_var_opt('gps-start-time',time) 2498 self.__start = time 2499 self.__data_start = time
2500 #if not self.__calibration and self.__ifo and self.__start > 0: 2501 # self.calibration() 2502
2503 - def get_start(self):
2504 """ 2505 Get the GPS start time of the node. 2506 """ 2507 return self.__start
2508
2509 - def set_end(self,time,pass_to_command_line=True):
2510 """ 2511 Set the GPS end time of the analysis node by setting a --gps-end-time 2512 option to the node when it is executed. 2513 @param time: GPS end time of job. 2514 @bool pass_to_command_line: add gps-end-time as variable option. 2515 """ 2516 if pass_to_command_line: 2517 self.add_var_opt('gps-end-time',time) 2518 self.__end = time 2519 self.__data_end = time
2520
2521 - def get_end(self):
2522 """ 2523 Get the GPS end time of the node. 2524 """ 2525 return self.__end
2526
2527 - def set_data_start(self,time):
2528 """ 2529 Set the GPS start time of the data needed by this analysis node. 2530 @param time: GPS start time of job. 2531 """ 2532 self.__data_start = time
2533
2534 - def get_data_start(self):
2535 """ 2536 Get the GPS start time of the data needed by this node. 2537 """ 2538 return self.__data_start
2539
2540 - def set_pad_data(self,pad):
2541 """ 2542 Set the GPS start time of the data needed by this analysis node. 2543 @param time: GPS start time of job. 2544 """ 2545 self.__pad_data = pad
2546
2547 - def get_pad_data(self):
2548 """ 2549 Get the GPS start time of the data needed by this node. 2550 """ 2551 return self.__pad_data
2552
2553 - def set_data_end(self,time):
2554 """ 2555 Set the GPS end time of the data needed by this analysis node. 2556 @param time: GPS end time of job. 2557 """ 2558 self.__data_end = time
2559
2560 - def get_data_end(self):
2561 """ 2562 Get the GPS end time of the data needed by this node. 2563 """ 2564 return self.__data_end
2565
2566 - def set_trig_start(self,time,pass_to_command_line=True):
2567 """ 2568 Set the trig start time of the analysis node by setting a 2569 --trig-start-time option to the node when it is executed. 2570 @param time: trig start time of job. 2571 @bool pass_to_command_line: add trig-start-time as a variable option. 2572 """ 2573 if pass_to_command_line: 2574 self.add_var_opt('trig-start-time',time) 2575 self.__trig_start = time
2576
2577 - def get_trig_start(self):
2578 """ 2579 Get the trig start time of the node. 2580 """ 2581 return self.__trig_start
2582
2583 - def set_trig_end(self,time,pass_to_command_line=True):
2584 """ 2585 Set the trig end time of the analysis node by setting a --trig-end-time 2586 option to the node when it is executed. 2587 @param time: trig end time of job. 2588 @bool pass_to_command_line: add trig-end-time as a variable option. 2589 """ 2590 if pass_to_command_line: 2591 self.add_var_opt('trig-end-time',time) 2592 self.__trig_end = time
2593
2594 - def get_trig_end(self):
2595 """ 2596 Get the trig end time of the node. 2597 """ 2598 return self.__trig_end
2599
2600 - def set_input(self,filename,pass_to_command_line=True):
2601 """ 2602 Add an input to the node by adding a --input option. 2603 @param filename: option argument to pass as input. 2604 @bool pass_to_command_line: add input as a variable option. 2605 """ 2606 self.__input = filename 2607 if pass_to_command_line: 2608 self.add_var_opt('input', filename) 2609 self.add_input_file(filename)
2610
2611 - def get_input(self):
2612 """ 2613 Get the file that will be passed as input. 2614 """ 2615 return self.__input
2616
2617 - def set_output(self,filename,pass_to_command_line=True):
2618 """ 2619 Add an output to the node by adding a --output option. 2620 @param filename: option argument to pass as output. 2621 @bool pass_to_command_line: add output as a variable option. 2622 """ 2623 self.__output = filename 2624 if pass_to_command_line: 2625 self.add_var_opt('output', filename) 2626 self.add_output_file(filename)
2627
2628 - def get_output(self):
2629 """ 2630 Get the file that will be passed as output. 2631 """ 2632 return self.__output
2633
2634 - def set_ifo(self,ifo):
2635 """ 2636 Set the ifo name to analyze. If the channel name for the job is defined, 2637 then the name of the ifo is prepended to the channel name obtained 2638 from the job configuration file and passed with a --channel-name option. 2639 @param ifo: two letter ifo code (e.g. L1, H1 or H2). 2640 """ 2641 self.__ifo = ifo 2642 if self.job().channel(): 2643 self.add_var_opt('channel-name', ifo + ':' + self.job().channel())
2644
2645 - def get_ifo(self):
2646 """ 2647 Returns the two letter IFO code for this node. 2648 """ 2649 return self.__ifo
2650
2651 - def set_ifo_tag(self,ifo_tag,pass_to_command_line=True):
2652 """ 2653 Set the ifo tag that is passed to the analysis code. 2654 @param ifo_tag: a string to identify one or more IFOs 2655 @bool pass_to_command_line: add ifo-tag as a variable option. 2656 """ 2657 self.__ifo_tag = ifo_tag 2658 if pass_to_command_line: 2659 self.add_var_opt('ifo-tag', ifo_tag)
2660
2661 - def get_ifo_tag(self):
2662 """ 2663 Returns the IFO tag string 2664 """ 2665 return self.__ifo_tag
2666
2667 - def set_user_tag(self,usertag,pass_to_command_line=True):
2668 """ 2669 Set the user tag that is passed to the analysis code. 2670 @param user_tag: the user tag to identify the job 2671 @bool pass_to_command_line: add user-tag as a variable option. 2672 """ 2673 self.__user_tag = usertag 2674 if pass_to_command_line: 2675 self.add_var_opt('user-tag', usertag)
2676
2677 - def get_user_tag(self):
2678 """ 2679 Returns the usertag string 2680 """ 2681 return self.__user_tag
2682
2683 - def set_cache(self,filename):
2684 """ 2685 Set the LAL frame cache to to use. The frame cache is passed to the job 2686 with the --frame-cache argument. 2687 @param filename: calibration file to use. 2688 """ 2689 if isinstance( filename, str ): 2690 # the name of a lal cache file created by a datafind node 2691 self.add_var_opt('frame-cache', filename) 2692 self.add_input_file(filename) 2693 elif isinstance( filename, list ): 2694 # we have an LFN list 2695 self.add_var_opt('glob-frame-data',' ') 2696 # only add the LFNs that actually overlap with this job 2697 # XXX FIXME this is a very slow algorithm 2698 if len(filename) == 0: 2699 raise CondorDAGNodeError( 2700 "LDR did not return any LFNs for query: check ifo and frame type") 2701 for lfn in filename: 2702 a, b, c, d = lfn.split('.')[0].split('-') 2703 t_start = int(c) 2704 t_end = int(c) + int(d) 2705 if (t_start <= (self.get_data_end()+self.get_pad_data()+int(d)+1) \ 2706 and t_end >= (self.get_data_start()-self.get_pad_data()-int(d)-1)): 2707 self.add_input_file(lfn) 2708 # set the frame type based on the LFNs returned by datafind 2709 self.add_var_opt('frame-type',b) 2710 else: 2711 raise CondorDAGNodeError("Unknown LFN cache format")
2712
2713 - def calibration_cache_path(self):
2714 """ 2715 Determine the path to the correct calibration cache file to use. 2716 """ 2717 if self.__ifo and self.__start > 0: 2718 cal_path = self.job().get_config('calibration','path') 2719 2720 # check if this is S2: split calibration epochs 2721 if ( self.__LHO2k.match(self.__ifo) and 2722 (self.__start >= 729273613) and (self.__start <= 734367613) ): 2723 if self.__start < int( 2724 self.job().get_config('calibration','H2-cal-epoch-boundary')): 2725 cal_file = self.job().get_config('calibration','H2-1') 2726 else: 2727 cal_file = self.job().get_config('calibration','H2-2') 2728 else: 2729 # if not: just add calibration cache 2730 cal_file = self.job().get_config('calibration',self.__ifo) 2731 2732 cal = os.path.join(cal_path,cal_file) 2733 self.__calibration_cache = cal 2734 else: 2735 msg = "IFO and start-time must be set first" 2736 raise CondorDAGNodeError(msg)
2737
2738 - def calibration(self):
2739 """ 2740 Set the path to the calibration cache file for the given IFO. 2741 During S2 the Hanford 2km IFO had two calibration epochs, so 2742 if the start time is during S2, we use the correct cache file. 2743 """ 2744 # figure out the name of the calibration cache files 2745 # as specified in the ini-file 2746 self.calibration_cache_path() 2747 2748 if self.job().is_dax(): 2749 # new code for DAX 2750 self.add_var_opt('glob-calibration-data','') 2751 cache_filename=self.get_calibration() 2752 pat = re.compile(r'(file://.*)') 2753 f = open(cache_filename, 'r') 2754 lines = f.readlines() 2755 2756 # loop over entries in the cache-file... 2757 for line in lines: 2758 m = pat.search(line) 2759 if not m: 2760 raise IOError 2761 url = m.group(1) 2762 # ... and add files to input-file list 2763 path = urllib.parse.urlparse(url)[2] 2764 calibration_lfn = os.path.basename(path) 2765 self.add_input_file(calibration_lfn) 2766 else: 2767 # old .calibration for DAG's 2768 self.add_var_opt('calibration-cache', self.__calibration_cache) 2769 self.__calibration = self.__calibration_cache 2770 self.add_input_file(self.__calibration)
2771
2772 - def get_calibration(self):
2773 """ 2774 Return the calibration cache file to be used by the 2775 DAG. 2776 """ 2777 return self.__calibration_cache
2778 2779 2780
2781 -class AnalysisChunk:
2782 """ 2783 An AnalysisChunk is the unit of data that a node works with, usually some 2784 subset of a ScienceSegment. 2785 """
2786 - def __init__(self, start, end, trig_start = 0, trig_end = 0):
2787 """ 2788 @param start: GPS start time of the chunk. 2789 @param end: GPS end time of the chunk. 2790 @param trig_start: GPS time at which to start generating triggers 2791 @param trig_end: GPS time at which to stop generating triggers 2792 """ 2793 self.__start = start 2794 self.__end = end 2795 self.__length = end - start 2796 self.__trig_start = trig_start 2797 self.__trig_end = trig_end
2798
2799 - def __repr__(self):
2800 if self.__trig_start and self.__trig_end: 2801 return '<AnalysisChunk: start %d, end %d, trig_start %d, trig_end %d>' % ( 2802 self.__start, self.__end, self.__trig_start, self.__trig_end) 2803 elif self.__trig_start and not self.__trig_end: 2804 return '<AnalysisChunk: start %d, end %d, trig_start %d>' % ( 2805 self.__start, self.__end, self.__trig_start) 2806 elif not self.__trig_start and self.__trig_end: 2807 return '<AnalysisChunk: start %d, end %d, trig_end %d>' % ( 2808 self.__start, self.__end, self.__trig_end) 2809 else: 2810 return '<AnalysisChunk: start %d, end %d>' % (self.__start, self.__end)
2811
2812 - def __len__(self):
2813 """ 2814 Returns the length of data for which this AnalysisChunk will produce 2815 triggers (in seconds). 2816 """ 2817 if self.__trig_start and self.__trig_end: 2818 x = self.__trig_end - self.__trig_start 2819 elif self.__trig_start and not self.__trig_end: 2820 x = self.__end - self.__trig_start 2821 elif not self.__trig_start and self.__trig_end: 2822 x = self.__trig_end - self.__start 2823 else: 2824 x = self.__end - self.__start 2825 2826 if x < 0: 2827 raise SegmentError(self + 'has negative length') 2828 else: 2829 return x
2830
2831 - def start(self):
2832 """ 2833 Returns the GPS start time of the chunk. 2834 """ 2835 return self.__start
2836
2837 - def end(self):
2838 """ 2839 Returns the GPS end time of the chunk. 2840 """ 2841 return self.__end
2842
2843 - def dur(self):
2844 """ 2845 Returns the length (duration) of the chunk in seconds. 2846 """ 2847 return self.__length
2848
2849 - def trig_start(self):
2850 """ 2851 Return the first GPS time at which triggers for this chunk should be 2852 generated. 2853 """ 2854 return self.__trig_start
2855
2856 - def trig_end(self):
2857 """ 2858 Return the last GPS time at which triggers for this chunk should be 2859 generated. 2860 """ 2861 return self.__trig_end
2862
2863 - def set_trig_start(self,start):
2864 """ 2865 Set the first GPS time at which triggers for this chunk should be 2866 generated. 2867 """ 2868 self.__trig_start = start
2869
2870 - def set_trig_end(self,end):
2871 """ 2872 Set the last GPS time at which triggers for this chunk should be 2873 generated. 2874 """ 2875 self.__trig_end = end
2876 2877 2878
2879 -class ScienceSegment:
2880 """ 2881 A ScienceSegment is a period of time where the experimenters determine 2882 that the inteferometer is in a state where the data is suitable for 2883 scientific analysis. A science segment can have a list of AnalysisChunks 2884 asscociated with it that break the segment up into (possibly overlapping) 2885 smaller time intervals for analysis. 2886 """
2887 - def __init__(self,segment):
2888 """ 2889 @param segment: a tuple containing the (segment id, gps start time, gps end 2890 time, duration) of the segment. 2891 """ 2892 self.__id = segment[0] 2893 self.__start = segment[1] 2894 self.__end = segment[2] 2895 self.__dur = segment[3] 2896 self.__chunks = [] 2897 self.__unused = self.dur() 2898 self.__ifo = None 2899 self.__df_node = None
2900
2901 - def __getitem__(self,i):
2902 """ 2903 Allows iteration over and direct access to the AnalysisChunks contained 2904 in this ScienceSegment. 2905 """ 2906 if i < 0: raise IndexError("list index out of range") 2907 return self.__chunks[i]
2908
2909 - def __len__(self):
2910 """ 2911 Returns the number of AnalysisChunks contained in this ScienceSegment. 2912 """ 2913 return len(self.__chunks)
2914
2915 - def __repr__(self):
2916 return '<ScienceSegment: id %d, start %d, end %d, dur %d, unused %d>' % ( 2917 self.id(),self.start(),self.end(),self.dur(),self.__unused)
2918
2919 - def __cmp__(self,other):
2920 """ 2921 ScienceSegments are compared by the GPS start time of the segment. 2922 """ 2923 return cmp(self.start(),other.start())
2924
2925 - def make_chunks(self,length=0,overlap=0,play=0,sl=0,excl_play=0,pad_data=0):
2926 """ 2927 Divides the science segment into chunks of length seconds overlapped by 2928 overlap seconds. If the play option is set, only chunks that contain S2 2929 playground data are generated. If the user has a more complicated way 2930 of generating chunks, this method should be overriden in a sub-class. 2931 Any data at the end of the ScienceSegment that is too short to contain a 2932 chunk is ignored. The length of this unused data is stored and can be 2933 retrieved with the unused() method. 2934 @param length: length of chunk in seconds. 2935 @param overlap: overlap between chunks in seconds. 2936 @param play: 1 : only generate chunks that overlap with S2 playground data. 2937 2 : as play = 1 plus compute trig start and end times to 2938 coincide with the start/end of the playground 2939 @param sl: slide by sl seconds before determining playground data. 2940 @param excl_play: exclude the first excl_play second from the start and end 2941 of the chunk when computing if the chunk overlaps with playground. 2942 @param pad_data: exclude the first and last pad_data seconds of the segment 2943 when generating chunks 2944 """ 2945 time_left = self.dur() - (2 * pad_data) 2946 start = self.start() + pad_data 2947 increment = length - overlap 2948 while time_left >= length: 2949 end = start + length 2950 if (not play) or (play and (((end-sl-excl_play-729273613) % 6370) < 2951 (600+length-2*excl_play))): 2952 if (play == 2): 2953 # calculate the start of the playground preceeding the chunk end 2954 play_start = 729273613 + 6370 * \ 2955 math.floor((end-sl-excl_play-729273613) / 6370) 2956 play_end = play_start + 600 2957 trig_start = 0 2958 trig_end = 0 2959 if ( (play_end - 6370) > start ): 2960 print("Two playground segments in this chunk:", end=' ') 2961 print(" Code to handle this case has not been implemented") 2962 sys.exit(1) 2963 else: 2964 if play_start > start: 2965 trig_start = int(play_start) 2966 if play_end < end: 2967 trig_end = int(play_end) 2968 self.__chunks.append(AnalysisChunk(start,end,trig_start,trig_end)) 2969 else: 2970 self.__chunks.append(AnalysisChunk(start,end)) 2971 start += increment 2972 time_left -= increment 2973 self.__unused = time_left - overlap
2974
2975 - def add_chunk(self,start,end,trig_start=0,trig_end=0):
2976 """ 2977 Add an AnalysisChunk to the list associated with this ScienceSegment. 2978 @param start: GPS start time of chunk. 2979 @param end: GPS end time of chunk. 2980 @param trig_start: GPS start time for triggers from chunk 2981 """ 2982 self.__chunks.append(AnalysisChunk(start,end,trig_start,trig_end))
2983
2984 - def unused(self):
2985 """ 2986 Returns the length of data in the science segment not used to make chunks. 2987 """ 2988 return self.__unused
2989
2990 - def set_unused(self,unused):
2991 """ 2992 Set the length of data in the science segment not used to make chunks. 2993 """ 2994 self.__unused = unused
2995
2996 - def id(self):
2997 """ 2998 Returns the ID of this ScienceSegment. 2999 """ 3000 return self.__id
3001
3002 - def start(self):
3003 """ 3004 Returns the GPS start time of this ScienceSegment. 3005 """ 3006 return self.__start
3007
3008 - def end(self):
3009 """ 3010 Returns the GPS end time of this ScienceSegment. 3011 """ 3012 return self.__end
3013
3014 - def set_start(self,t):
3015 """ 3016 Override the GPS start time (and set the duration) of this ScienceSegment. 3017 @param t: new GPS start time. 3018 """ 3019 self.__dur += self.__start - t 3020 self.__start = t
3021
3022 - def set_end(self,t):
3023 """ 3024 Override the GPS end time (and set the duration) of this ScienceSegment. 3025 @param t: new GPS end time. 3026 """ 3027 self.__dur -= self.__end - t 3028 self.__end = t
3029
3030 - def dur(self):
3031 """ 3032 Returns the length (duration) in seconds of this ScienceSegment. 3033 """ 3034 return self.__dur
3035
3036 - def set_df_node(self,df_node):
3037 """ 3038 Set the DataFind node associated with this ScienceSegment to df_node. 3039 @param df_node: the DataFind node for this ScienceSegment. 3040 """ 3041 self.__df_node = df_node
3042
3043 - def get_df_node(self):
3044 """ 3045 Returns the DataFind node for this ScienceSegment. 3046 """ 3047 return self.__df_node
3048 3049
3050 -class ScienceData:
3051 """ 3052 An object that can contain all the science data used in an analysis. Can 3053 contain multiple ScienceSegments and has a method to generate these from 3054 a text file produces by the LIGOtools segwizard program. 3055 """
3056 - def __init__(self):
3057 self.__sci_segs = [] 3058 self.__filename = None
3059
3060 - def __getitem__(self,i):
3061 """ 3062 Allows direct access to or iteration over the ScienceSegments associated 3063 with the ScienceData. 3064 """ 3065 return self.__sci_segs[i]
3066
3067 - def __repr__(self):
3068 return '<ScienceData: file %s>' % self.__filename
3069
3070 - def __len__(self):
3071 """ 3072 Returns the number of ScienceSegments associated with the ScienceData. 3073 """ 3074 return len(self.__sci_segs)
3075
3076 - def read(self,filename,min_length,slide_sec=0,buffer=0):
3077 """ 3078 Parse the science segments from the segwizard output contained in file. 3079 @param filename: input text file containing a list of science segments generated by 3080 segwizard. 3081 @param min_length: only append science segments that are longer than min_length. 3082 @param slide_sec: Slide each ScienceSegment by:: 3083 3084 delta > 0: 3085 [s,e] -> [s+delta,e]. 3086 delta < 0: 3087 [s,e] -> [s,e-delta]. 3088 3089 @param buffer: shrink the ScienceSegment:: 3090 3091 [s,e] -> [s+buffer,e-buffer] 3092 """ 3093 self.__filename = filename 3094 octothorpe = re.compile(r'\A#') 3095 for line in open(filename): 3096 if not octothorpe.match(line) and int(line.split()[3]) >= min_length: 3097 (id,st,en,du) = list(map(int,line.split())) 3098 3099 # slide the data if doing a background estimation 3100 if slide_sec > 0: 3101 st += slide_sec 3102 elif slide_sec < 0: 3103 en += slide_sec 3104 du -= abs(slide_sec) 3105 3106 # add a buffer 3107 if buffer > 0: 3108 st += buffer 3109 en -= buffer 3110 du -= 2*abs(buffer) 3111 3112 x = ScienceSegment(tuple([id,st,en,du])) 3113 self.__sci_segs.append(x)
3114
3115 - def append_from_tuple(self,seg_tuple):
3116 x = ScienceSegment(seg_tuple) 3117 self.__sci_segs.append(x)
3118
3119 - def tama_read(self,filename):
3120 """ 3121 Parse the science segments from a tama list of locked segments contained in 3122 file. 3123 @param filename: input text file containing a list of tama segments. 3124 """ 3125 self.__filename = filename 3126 for line in open(filename): 3127 columns = line.split() 3128 id = int(columns[0]) 3129 start = int(math.ceil(float(columns[3]))) 3130 end = int(math.floor(float(columns[4]))) 3131 dur = end - start 3132 3133 x = ScienceSegment(tuple([id, start, end, dur])) 3134 self.__sci_segs.append(x)
3135 3136
3137 - def make_chunks(self,length,overlap=0,play=0,sl=0,excl_play=0,pad_data=0):
3138 """ 3139 Divide each ScienceSegment contained in this object into AnalysisChunks. 3140 @param length: length of chunk in seconds. 3141 @param overlap: overlap between segments. 3142 @param play: if true, only generate chunks that overlap with S2 playground 3143 data. 3144 @param sl: slide by sl seconds before determining playground data. 3145 @param excl_play: exclude the first excl_play second from the start and end 3146 of the chunk when computing if the chunk overlaps with playground. 3147 """ 3148 for seg in self.__sci_segs: 3149 seg.make_chunks(length,overlap,play,sl,excl_play,pad_data)
3150
3151 - def make_chunks_from_unused(self,length,trig_overlap,play=0,min_length=0, 3152 sl=0,excl_play=0,pad_data=0):
3153 """ 3154 Create an extra chunk that uses up the unused data in the science segment. 3155 @param length: length of chunk in seconds. 3156 @param trig_overlap: length of time start generating triggers before the 3157 start of the unused data. 3158 @param play: 3159 - 1 : only generate chunks that overlap with S2 playground data. 3160 - 2 : as 1 plus compute trig start and end times to coincide 3161 with the start/end of the playground 3162 @param min_length: the unused data must be greater than min_length to make a 3163 chunk. 3164 @param sl: slide by sl seconds before determining playground data. 3165 @param excl_play: exclude the first excl_play second from the start and end 3166 of the chunk when computing if the chunk overlaps with playground. 3167 @param pad_data: exclude the first and last pad_data seconds of the segment 3168 when generating chunks 3169 3170 """ 3171 for seg in self.__sci_segs: 3172 # if there is unused data longer than the minimum chunk length 3173 if seg.unused() > min_length: 3174 end = seg.end() - pad_data 3175 start = end - length 3176 if (not play) or (play and (((end-sl-excl_play-729273613)%6370) < 3177 (600+length-2*excl_play))): 3178 trig_start = end - seg.unused() - trig_overlap 3179 if (play == 2): 3180 # calculate the start of the playground preceeding the chunk end 3181 play_start = 729273613 + 6370 * \ 3182 math.floor((end-sl-excl_play-729273613) / 6370) 3183 play_end = play_start + 600 3184 trig_end = 0 3185 if ( (play_end - 6370) > start ): 3186 print("Two playground segments in this chunk") 3187 print(" Code to handle this case has not been implemented") 3188 sys.exit(1) 3189 else: 3190 if play_start > trig_start: 3191 trig_start = int(play_start) 3192 if (play_end < end): 3193 trig_end = int(play_end) 3194 if (trig_end == 0) or (trig_end > trig_start): 3195 seg.add_chunk(start, end, trig_start, trig_end) 3196 else: 3197 seg.add_chunk(start, end, trig_start) 3198 seg.set_unused(0)
3199
3200 - def make_short_chunks_from_unused( 3201 self,min_length,overlap=0,play=0,sl=0,excl_play=0):
3202 """ 3203 Create a chunk that uses up the unused data in the science segment 3204 @param min_length: the unused data must be greater than min_length to make a 3205 chunk. 3206 @param overlap: overlap between chunks in seconds. 3207 @param play: if true, only generate chunks that overlap with S2 playground data. 3208 @param sl: slide by sl seconds before determining playground data. 3209 @param excl_play: exclude the first excl_play second from the start and end 3210 of the chunk when computing if the chunk overlaps with playground. 3211 """ 3212 for seg in self.__sci_segs: 3213 if seg.unused() > min_length: 3214 start = seg.end() - seg.unused() - overlap 3215 end = seg.end() 3216 length = start - end 3217 if (not play) or (play and (((end-sl-excl_play-729273613)%6370) < 3218 (600+length-2*excl_play))): 3219 seg.add_chunk(start, end, start) 3220 seg.set_unused(0)
3221
3222 - def make_optimised_chunks(self, min_length, max_length, pad_data=0):
3223 """ 3224 Splits ScienceSegments up into chunks, of a given maximum length. 3225 The length of the last two chunks are chosen so that the data 3226 utilisation is optimised. 3227 @param min_length: minimum chunk length. 3228 @param max_length: maximum chunk length. 3229 @param pad_data: exclude the first and last pad_data seconds of the 3230 segment when generating chunks 3231 """ 3232 for seg in self.__sci_segs: 3233 # pad data if requested 3234 seg_start = seg.start() + pad_data 3235 seg_end = seg.end() - pad_data 3236 3237 if seg.unused() > max_length: 3238 # get number of max_length chunks 3239 N = (seg_end - seg_start)/max_length 3240 3241 # split into chunks of max_length 3242 for i in range(N-1): 3243 start = seg_start + (i * max_length) 3244 stop = start + max_length 3245 seg.add_chunk(start, stop) 3246 3247 # optimise data usage for last 2 chunks 3248 start = seg_start + ((N-1) * max_length) 3249 middle = (start + seg_end)/2 3250 seg.add_chunk(start, middle) 3251 seg.add_chunk(middle, seg_end) 3252 seg.set_unused(0) 3253 elif seg.unused() > min_length: 3254 # utilise as single chunk 3255 seg.add_chunk(seg_start, seg_end) 3256 else: 3257 # no chunk of usable length 3258 seg.set_unused(0)
3259
3260 - def intersection(self, other):
3261 """ 3262 Replaces the ScienceSegments contained in this instance of ScienceData 3263 with the intersection of those in the instance other. Returns the number 3264 of segments in the intersection. 3265 @param other: ScienceData to use to generate the intersection 3266 """ 3267 3268 # we only deal with the case of two lists here 3269 length1 = len(self) 3270 length2 = len(other) 3271 3272 # initialize list of output segments 3273 ostart = -1 3274 outlist = [] 3275 iseg2 = -1 3276 start2 = -1 3277 stop2 = -1 3278 3279 for seg1 in self: 3280 start1 = seg1.start() 3281 stop1 = seg1.end() 3282 id = seg1.id() 3283 3284 # loop over segments from the second list which overlap this segment 3285 while start2 < stop1: 3286 if stop2 > start1: 3287 # these overlap 3288 3289 # find the overlapping range 3290 if start1 < start2: 3291 ostart = start2 3292 else: 3293 ostart = start1 3294 if stop1 > stop2: 3295 ostop = stop2 3296 else: 3297 ostop = stop1 3298 3299 x = ScienceSegment(tuple([id, ostart, ostop, ostop-ostart])) 3300 outlist.append(x) 3301 3302 if stop2 > stop1: 3303 break 3304 3305 # step forward 3306 iseg2 += 1 3307 if iseg2 < len(other): 3308 seg2 = other[iseg2] 3309 start2 = seg2.start() 3310 stop2 = seg2.end() 3311 else: 3312 # pseudo-segment in the far future 3313 start2 = 2000000000 3314 stop2 = 2000000000 3315 3316 # save the intersection and return the length 3317 self.__sci_segs = outlist 3318 return len(self)
3319 3320 3321
3322 - def union(self, other):
3323 """ 3324 Replaces the ScienceSegments contained in this instance of ScienceData 3325 with the union of those in the instance other. Returns the number of 3326 ScienceSegments in the union. 3327 @param other: ScienceData to use to generate the intersection 3328 """ 3329 3330 # we only deal with the case of two lists here 3331 length1 = len(self) 3332 length2 = len(other) 3333 3334 # initialize list of output segments 3335 ostart = -1 3336 seglist = [] 3337 3338 i1 = -1 3339 i2 = -1 3340 start1 = -1 3341 start2 = -1 3342 id = -1 3343 3344 while 1: 3345 # if necessary, get a segment from list 1 3346 if start1 == -1: 3347 i1 += 1 3348 if i1 < length1: 3349 start1 = self[i1].start() 3350 stop1 = self[i1].end() 3351 id = self[i1].id() 3352 elif i2 == length2: 3353 break 3354 3355 # if necessary, get a segment from list 2 3356 if start2 == -1: 3357 i2 += 1 3358 if i2 < length2: 3359 start2 = other[i2].start() 3360 stop2 = other[i2].end() 3361 elif i1 == length1: 3362 break 3363 3364 # pick the earlier segment from the two lists 3365 if start1 > -1 and ( start2 == -1 or start1 <= start2): 3366 ustart = start1 3367 ustop = stop1 3368 # mark this segment has having been consumed 3369 start1 = -1 3370 elif start2 > -1: 3371 ustart = start2 3372 ustop = stop2 3373 # mark this segment has having been consumed 3374 start2 = -1 3375 else: 3376 break 3377 3378 # if the output segment is blank, initialize it; otherwise, see 3379 # whether the new segment extends it or is disjoint 3380 if ostart == -1: 3381 ostart = ustart 3382 ostop = ustop 3383 elif ustart <= ostop: 3384 if ustop > ostop: 3385 # this extends the output segment 3386 ostop = ustop 3387 else: 3388 # This lies entirely within the current output segment 3389 pass 3390 else: 3391 # flush the current output segment, and replace it with the 3392 # new segment 3393 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart])) 3394 seglist.append(x) 3395 ostart = ustart 3396 ostop = ustop 3397 3398 # flush out the final output segment (if any) 3399 if ostart != -1: 3400 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart])) 3401 seglist.append(x) 3402 3403 self.__sci_segs = seglist 3404 return len(self)
3405 3406
3407 - def coalesce(self):
3408 """ 3409 Coalesces any adjacent ScienceSegments. Returns the number of 3410 ScienceSegments in the coalesced list. 3411 """ 3412 3413 # check for an empty list 3414 if len(self) == 0: 3415 return 0 3416 3417 # sort the list of science segments 3418 self.__sci_segs.sort() 3419 3420 # coalesce the list, checking each segment for validity as we go 3421 outlist = [] 3422 ostop = -1 3423 3424 for seg in self: 3425 start = seg.start() 3426 stop = seg.end() 3427 id = seg.id() 3428 if start > ostop: 3429 # disconnected, so flush out the existing segment (if any) 3430 if ostop >= 0: 3431 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart])) 3432 outlist.append(x) 3433 ostart = start 3434 ostop = stop 3435 elif stop > ostop: 3436 # extend the current segment 3437 ostop = stop 3438 3439 # flush out the final segment (if any) 3440 if ostop >= 0: 3441 x = ScienceSegment(tuple([id,ostart,ostop,ostop-ostart])) 3442 outlist.append(x) 3443 3444 self.__sci_segs = outlist 3445 return len(self)
3446 3447
3448 - def invert(self):
3449 """ 3450 Inverts the ScienceSegments in the class (i.e. set NOT). Returns the 3451 number of ScienceSegments after inversion. 3452 """ 3453 3454 # check for an empty list 3455 if len(self) == 0: 3456 # return a segment representing all time 3457 self.__sci_segs = ScienceSegment(tuple([0,0,1999999999,1999999999])) 3458 3459 # go through the list checking for validity as we go 3460 outlist = [] 3461 ostart = 0 3462 for seg in self: 3463 start = seg.start() 3464 stop = seg.end() 3465 if start < 0 or stop < start or start < ostart: 3466 raise SegmentError("Invalid list") 3467 if start > 0: 3468 x =