Package glue :: Module dagfile
[hide private]
[frames] | no frames]

Source Code for Module glue.dagfile

  1  # Copyright (C) 2011--2015  Kipp Cannon 
  2  # Copyright (C) 2004--2006  Brian Moe 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify it 
  5  # under the terms of the GNU General Public License as published by the 
  6  # Free Software Foundation; either version 3 of the License, or (at your 
  7  # option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, but 
 10  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
 12  # Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License along 
 15  # with this program; if not, write to the Free Software Foundation, Inc., 
 16  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
 17   
 18  """ 
 19  Machinery for reading, editing, and writing Condor DAG files. 
 20   
 21  When running DAGs on Condor compute clusters, very often one will wish to 
 22  re-run a portion of a DAG.  This can be done by marking all jobs except the 
 23  ones to be re-run as "DONE".  Unfortunately the Condor software suite lacks 
 24  an I/O library for reading and writing Condor DAG files, so there is no 
 25  easy way to edit DAG files except by playing games sed, awk, or once-off 
 26  Python or Perl scripts.  That's where this module comes in.  This module 
 27  will read a DAG file into an in-ram representation that is easily edited, 
 28  and allow the file to be written to disk again. 
 29   
 30  Example: 
 31   
 32  >>> from glue import dagfile 
 33  >>> dag = dagfile.DAG.parse(open("pipeline.dag")) 
 34  >>> dag.write(open("pipeline.dag", "w")) 
 35   
 36  Although it is possible to machine-generate an original DAG file using this 
 37  module and write it to disk, this module does not provide the tools 
 38  required to do any of the other tasks associated with pipeline 
 39  construction.  For example there is no facility here to generate or manage 
 40  submit files, data files, or any other files that are associated with a 
 41  full pipeline.  Only the DAG file itself is considered here.  For general 
 42  pipeline construction see the pipeline module.  The focus of this module is 
 43  on editing existing DAG files. 
 44   
 45  Developers should also consider doing any new pipeline development using 
 46  DAX files as the fundamental workflow description, instead of DAGs.  See 
 47  http://pegasus.isi.edu for more information. 
 48   
 49  A DAG file is loaded using the .parse() class method of the DAG class. 
 50  This parses the file-like object passed to it and returns an instance of 
 51  the DAG class representing the file's contents.  Once loaded, the nodes in 
 52  the DAG can all be found in the .nodes dictionary, whose keys are the node 
 53  names and whose values are the corresponding node objects.  Among each node 
 54  object's attributes are sets .children and .parents containing references 
 55  to the child and parent nodes (not their names) for each node.  Note that 
 56  every node must appear listed as a parent of each of its children, and vice 
 57  versa.  The other attributes of a DAG instance contain information about 
 58  the DAG, for example the CONFIG file or the DOT file, and so on.  All of 
 59  the data for each node in the DAG, for example the node's VARS value, its 
 60  initial working directory, and so on, can be found in the attributes of the 
 61  nodes themselves.  A DAG is written to a file using the .write() method of 
 62  the DAG object. 
 63  """ 
 64   
 65   
 66  # 
 67  # ============================================================================= 
 68  # 
 69  #                                   Preamble 
 70  # 
 71  # ============================================================================= 
 72  # 
 73   
 74   
 75  import re 
 76   
 77   
 78  __all__ = ["DAG", "JOB", "DATA", "SPLICE", "SUBDAG_EXTERNAL"] 
79 80 81 # 82 # ============================================================================= 83 # 84 # Progress Wrapper 85 # 86 # ============================================================================= 87 # 88 89 90 -class progress_wrapper(object):
91 """ 92 Progress report wrapper. For internal use only. 93 """
94 - def __init__(self, f, callback):
95 self.n = 0 96 self.f = f 97 self.callback = callback
98
99 - def __iadd__(self, dn):
100 self.n += dn 101 if self.callback is not None and not self.n % 7411: 102 self.callback(self.f, self.n, False) 103 return self
104
105 - def __del__(self):
106 if self.callback is not None: 107 self.callback(self.f, self.n, True)
108
109 110 -class nofile(object):
111 """ 112 Object providing a no-op .write() method to fake a file. For 113 internal use only. 114 """
115 - def write(self, *args):
116 pass
117
118 119 # 120 # ============================================================================= 121 # 122 # The Contents of a Condor DAG File 123 # 124 # ============================================================================= 125 # 126 127 128 -class JOB(object):
129 """ 130 Representation of a JOB node in a Condor DAG. JOB objects have the 131 following attributes corresponding to information in the DAG file: 132 133 .name 134 The name of the node in the DAG. 135 136 .filename 137 The name of the submit file for the JOB. 138 139 .directory 140 The initial working directory for the JOB. Set to None to 141 omit from DAG (job's working directory will be chosen by 142 Condor). 143 144 .done 145 Boolean indicating if the JOB is done or not. See 146 DAG.load_rescue() for more information. 147 148 .noop 149 Boolean indicating if the JOB is a no-op or not. 150 151 .vars 152 A dictionary of the name-->value pairs in the VARS line for 153 the JOB. Leave empty to omit VARS from DAG. 154 155 .retry 156 The number of retries for the job. Set to None to omit 157 from DAG. 158 159 .retry_unless_exit_value 160 The value of the UNLESS-EXIT suffix of the RETRY line. 161 Set to None to omit from DAG. 162 163 .priority 164 .category 165 The PRIORITY value and CATEGORY name for the node in the 166 DAG. Set to None to omit from the DAG. 167 168 .parents 169 .children 170 Sets of the parent and child nodes of JOB. The sets 171 contain references to the node objects, not their names. 172 173 .prescript 174 .prescriptargs 175 .postscript 176 .postscriptargs 177 The names and lists of arguments of the PRE and POST 178 scripts. Set to None to omit from DAG. 179 180 .abort_dag_on_abortexitvalue 181 .abort_dag_on_dagreturnvalue 182 The ABORT-DAG-ON abort exit value and DAG return value for 183 the JOB. Set to None to omit from DAG. 184 185 For more information about the function of these parameters, refer 186 to the Condor documentation. 187 """ 188 keyword = "JOB" 189
190 - def __init__(self, name, filename, directory = None, done = False, noop = False):
191 # information from the JOB line in the DAG file 192 self.name = name 193 self.filename = filename 194 self.directory = directory 195 self.done = done 196 self.noop = noop 197 198 # the VARS line in the DAG file. orderless name, value 199 # pairs 200 self.vars = {} 201 202 # the RETRY line in the DAG file 203 self.retry = None 204 self.retry_unless_exit_value = None 205 206 # the PRIORITY and CATEGORY lines in the DAG file 207 self.priority = None 208 self.category = None 209 210 # the parents and children of this node. the sets contain 211 # references to the parent and child objects, not their 212 # names 213 self.parents = set() 214 self.children = set() 215 216 # the names and arguments of the PRE and POST scripts, if 217 # any 218 self.prescript = None 219 self.prescriptargs = None 220 self.postscript = None 221 self.postscriptargs = None 222 223 # the ABORT-DAG-ON abort exit value and dag return value 224 # for this job if they are set, or None if not 225 self.abort_dag_on_abortexitvalue = None 226 self.abort_dag_on_dagreturnvalue = None
227
228 - def write(self, f, progress = None):
229 """ 230 Write the lines describing this node to the file-like 231 object f. The object must provide a .write() method. 232 233 If progress is not None, it will be incremented by 1 for 234 every line written. 235 """ 236 # JOB ... 237 f.write("%s %s %s" % (self.keyword, self.name, self.filename)) 238 if self.directory is not None: 239 f.write(" DIR \"%s\"" % self.directory) 240 if self.noop: 241 f.write(" NOOP") 242 if self.done: 243 f.write(" DONE") 244 f.write("\n") 245 if progress is not None: 246 progress += 1 247 248 # PRIORITY ... 249 if self.priority: 250 f.write("PRIORITY %s %d\n" % (self.name, self.priority)) 251 if progress is not None: 252 progress += 1 253 254 # CATEGORY ... 255 if self.category is not None: 256 f.write("CATEGORY %s %s\n" % (self.name, self.category)) 257 if progress is not None: 258 progress += 1 259 260 # RETRY ... 261 if self.retry: 262 f.write("RETRY %s %d" % (self.name, self.retry)) 263 if self.retry_unless_exit_value is not None: 264 f.write(" UNLESS-EXIT %d" % self.retry_unless_exit_value) 265 f.write("\n") 266 if progress is not None: 267 progress += 1 268 269 # VARS ... 270 if self.vars: 271 f.write("VARS %s" % self.name) 272 for name, value in sorted(self.vars.items()): 273 # apply escape rules to the value 274 f.write(" %s=\"%s\"" % (name, value.replace("\\", "\\\\").replace("\"", "\\\""))) 275 f.write("\n") 276 if progress is not None: 277 progress += 1 278 279 # SCRIPT PRE ... 280 if self.prescript is not None: 281 f.write("SCRIPT PRE %s %s" % (self.name, self.prescript)) 282 if self.prescriptargs: 283 f.write(" %s" % " ".join(self.prescriptargs)) 284 f.write("\n") 285 if progress is not None: 286 progress += 1 287 288 # SCRIPT POST ... 289 if self.postscript is not None: 290 f.write("SCRIPT POST %s %s" % (self.name, self.postscript)) 291 if self.postscriptargs: 292 f.write(" %s" % " ".join(self.postscriptargs)) 293 f.write("\n") 294 if progress is not None: 295 progress += 1 296 297 # ABORT-DAG-ON ... 298 if self.abort_dag_on_abortexitvalue is not None: 299 f.write("ABORT-DAG-ON %s %d" % (self.name, self.abort_dag_on_abortexitvalue)) 300 if self.abort_dag_on_dagreturnvalue is not None: 301 f.write(" RETURN %d" % self.abort_dag_on_dagreturnvalue) 302 f.write("\n") 303 if progress is not None: 304 progress += 1
305 306 # state 307 @property
308 - def state(self):
309 """ 310 Get the state of the node. One of 'wait', 'idle', 'run', 311 'abort', 'stop', 'success', 'fail'. 312 313 NOTE: this feature is not implemented at this time. 314 """ 315 raise NotImplemented
316
317 318 -class DATA(JOB):
319 """ 320 Representation of a Stork DATA node in a Condor DAG. 321 """ 322 keyword = "DATA"
323
324 325 -class SUBDAG_EXTERNAL(JOB):
326 """ 327 Representation of a SUBDAG EXTERNAL node in a Condor DAG. 328 """ 329 keyword = "SUBDAG EXTERNAL"
330
331 332 -class SPLICE(JOB):
333 """ 334 Representation of a SPLICE node in a Condor DAG. 335 """ 336 # NOTE: although this is a subclass of the JOB class, splices 337 # don't support most of the things that can be associated with 338 # jobs, like VARS and so on, so don't set attributes that shouldn't 339 # be set or you'll get a nonsense DAG. In the future, more error 340 # checking might be added to prevent mis-use 341 keyword = "SPLICE"
342
343 344 -class DAG(object):
345 """ 346 Representation of the contents of a Condor DAG file. 347 348 BUGS: the semantics of the "+" special character in category names 349 is not understood. For now, it is an error for a node's category 350 to not be found verbatim in a MAXJOBS line. The "+" character is a 351 wildcard-like character used in the assignment of MAXJOBS values to 352 job categories in splices; see the Condor documentation for more 353 information. 354 """ 355 356 # 357 # lines in DAG files 358 # 359 360 dotpat = re.compile(r'^DOT\s+(?P<filename>\S+)(\s+(?P<options>.+))?', re.IGNORECASE) 361 jobpat = re.compile(r'^JOB\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE) 362 datapat = re.compile(r'^DATA\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE) 363 subdagpat = re.compile(r'^SUBDAG\s+EXTERNAL\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE) 364 splicepat = re.compile(r'^SPLICE\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?', re.IGNORECASE) 365 prioritypat = re.compile(r'^PRIORITY\s+(?P<name>\S+)\s+(?P<value>\S+)', re.IGNORECASE) 366 categorypat = re.compile(r'^CATEGORY\s+(?P<name>\S+)\s+(?P<category>\S+)', re.IGNORECASE) 367 retrypat = re.compile(r'^RETRY\s+(?P<name>\S+)\s+(?P<retries>\S+)(\s+UNLESS-EXIT\s+(?P<retry_unless_exit_value>\S+))?', re.IGNORECASE) 368 varspat = re.compile(r'^VARS\s+(?P<name>\S+)\s+(?P<vars>.+)', re.IGNORECASE) 369 varsvaluepat = re.compile(r'(?P<name>\S+)\s*=\s*"(?P<value>.*?)(?<!\\)"', re.IGNORECASE) 370 scriptpat = re.compile(r'^SCRIPT\s+(?P<type>(PRE)|(POST))\s(?P<name>\S+)\s+(?P<executable>\S+)(\s+(?P<arguments>.+))?', re.IGNORECASE) 371 abortdagonpat = re.compile(r'^ABORT-DAG-ON\s+(?P<name>\S+)\s+(?P<exitvalue>\S+)(\s+RETURN\s+(?P<returnvalue>\S+))?', re.IGNORECASE) 372 arcpat = re.compile(r'^PARENT\s+(?P<parents>.+?)\s+CHILD\s+(?P<children>.+)', re.IGNORECASE) 373 maxjobspat = re.compile(r'^MAXJOBS\s+(?P<category>\S+)\s+(?P<value>\S+)', re.IGNORECASE) 374 configpat = re.compile(r'^CONFIG\s+(?P<filename>\S+)', re.IGNORECASE) 375 nodestatuspat = re.compile(r'^NODE_STATUS_FILE\s+(?P<filename>\S+)(\s+(?P<updatetime>\S+))?', re.IGNORECASE) 376 jobstatepat = re.compile(r'^JOBSTATE_LOG\s+(?P<filename>\S+)', re.IGNORECASE) 377 378 # 379 # lines in rescue DAG files 380 # 381 382 donepat = re.compile(r'^DONE\s+(?P<name>\S+)', re.IGNORECASE) 383 384 # 385 # methods 386 # 387
388 - def __init__(self):
389 # node name --> JOB object mapping 390 self.nodes = {} 391 # category name --> integer max jobs value mapping. all 392 # categories are listed, that is it is an error for a JOB 393 # in the DAG to claim to be in a category that cannot be 394 # found in this dictionary. categories that don't have a 395 # MAXJOBS set for them use None as their max jobs value in 396 # this dictionary. 397 self.maxjobs = {} 398 # filename or None 399 self.config = None 400 # filename or None 401 self.dot = None 402 # booleans, defaults match Condor's 403 self.dotupdate = False 404 self.dotoverwrite = True 405 # filename or None 406 self.dotinclude = None 407 # filename and update time or None 408 self.node_status_file = None 409 self.node_status_file_updatetime = None 410 # filename or None 411 self.jobstate_log = None
412
413 - def reindex(self):
414 """ 415 Rebuild the .nodes index. This is required if the names of 416 nodes are changed. 417 """ 418 # the .nodes object has its contents replaced instead of 419 # building a new object so that if external code is holding 420 # a reference to it that code sees the new index as well 421 nodes = dict((node.name, node) for node in self.nodes.values()) 422 if len(nodes) != len(self.nodes): 423 raise ValueError("node names are not unique") 424 self.nodes.clear() 425 self.nodes.update(nodes)
426 427 @classmethod
428 - def parse(cls, f, progress = None):
429 """ 430 Parse the file-like object f as a Condor DAG file. Return 431 a DAG object. The file object must be iterable, yielding 432 one line of text of the DAG file in each iteration. 433 434 If the progress argument is not None, it should be a 435 callable object. This object will be called periodically 436 and passed the f argument, the current line number, and a 437 boolean indicating if parsing is complete. The boolean is 438 always False until parsing is complete, then the callable 439 will be invoked one last time with the final line count and 440 the boolean set to True. 441 442 Example: 443 444 >>> def progress(f, n, done): 445 ... print "reading %s: %d lines\\r" % (f.name, n), 446 ... if done: 447 ... print 448 ... 449 >>> dag = DAG.parse(open("pipeline.dag"), progress = progress) 450 """ 451 progress = progress_wrapper(f, progress) 452 self = cls() 453 arcs = [] 454 for n, line in enumerate(f, start = 1): 455 # progress 456 progress += 1 457 # skip comments and blank lines 458 line = line.strip() 459 if not line or line.startswith("#"): 460 continue 461 # JOB ... 462 m = self.jobpat.search(line) 463 if m is not None: 464 if m.group("name") in self.nodes: 465 raise ValueError("line %d: duplicate JOB %s" % (n, m.group("name"))) 466 self.nodes[m.group("name")] = JOB(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop"))) 467 continue 468 # DATA ... 469 m = self.datapat.search(line) 470 if m is not None: 471 if m.group("name") in self.nodes: 472 raise ValueError("line %d: duplicate DATA %s" % (n, m.group("name"))) 473 self.nodes[m.group("name")] = DATA(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop"))) 474 continue 475 # SUBDAG EXTERNAL ... 476 m = self.subdagpat.search(line) 477 if m is not None: 478 if m.group("name") in self.nodes: 479 raise ValueError("line %d: duplicate SUBDAG EXTERNAL %s" % (n, m.group("name"))) 480 self.nodes[m.group("name")] = SUBDAG_EXTERNAL(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop"))) 481 continue 482 # SPLICE ... 483 m = self.splicepat.search(line) 484 if m is not None: 485 if m.group("name") in self.nodes: 486 raise ValueError("line %d: duplicate SPLICE %s" % (n, m.group("name"))) 487 self.nodes[m.group("name")] = SPLICE(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\"")) 488 continue 489 # VARS ... 490 m = self.varspat.search(line) 491 if m is not None: 492 node = self.nodes[m.group("name")] 493 # FIXME: find a way to detect malformed name=value pairs 494 for name, value in self.varsvaluepat.findall(m.group("vars")): 495 if name in node.vars: 496 raise ValueError("line %d: multiple variable %s for %s %s" % (n, name, node.keyword, node.name)) 497 # apply unescape rules to the value 498 node.vars[name] = value.replace("\\\\", "\\").replace("\\\"", "\"") 499 continue 500 # PARENT ... CHILD ... 501 m = self.arcpat.search(line) 502 if m is not None: 503 parents = m.group("parents").strip().split() 504 children = m.group("children").strip().split() 505 arcs.extend((parent, child) for parent in parents for child in children) 506 continue 507 # RETRY ... 508 m = self.retrypat.search(line) 509 if m is not None: 510 node = self.nodes[m.group("name")] 511 node.retry = int(m.group("retries")) 512 node.retry_unless_exit_value = m.group("retry_unless_exit_value") 513 continue 514 # SCRIPT ... 515 m = self.scriptpat.search(line) 516 if m is not None: 517 node = self.nodes[m.group("name")] 518 if m.group("type").upper() == "PRE": 519 if node.prescript is not None: 520 raise ValueError("line %d: multiple SCRIPT PRE for %s %s" % (n, node.keyword, node.name)) 521 node.prescript = m.group("executable") 522 if m.group("arguments") is not None: 523 node.prescriptargs = m.group("arguments").split() 524 elif m.group("type").upper() == "POST": 525 if node.postscript is not None: 526 raise ValueError("line %d: multiple SCRIPT POST for %s %s" % (n, node.keyword, node.name)) 527 node.postscript = m.group("executable") 528 if m.group("arguments") is not None: 529 node.postscriptargs = m.group("arguments").split() 530 else: 531 assert False # impossible to get here 532 continue 533 # PRIORITY ... 534 m = self.prioritypat.search(line) 535 if m is not None: 536 node = self.nodes[m.group("name")] 537 if node.priority is not None: 538 raise ValueError("line %d: multiple PRIORITY for %s %s" % (n, node.keyword, node.name)) 539 node.priority = int(m.group("value")) 540 continue 541 # CATEGORY ... 542 m = self.categorypat.search(line) 543 if m is not None: 544 self.nodes[m.group("name")].category = m.group("category") 545 continue 546 # ABORT-DAG-ON ... 547 m = self.abortdagonpat.search(line) 548 if m is not None: 549 node = self.nodes[m.group("name")] 550 if node.abort_dag_on_abortexitvalue is not None: 551 raise ValueError("line %d: multiple ABORT-DAG-ON for %s %s" % (n, node.keyword, node.name)) 552 node.abort_dag_on_abortexitvalue = int(m.group("exitvalue")) 553 if m.group("returnvalue") is not None: 554 node.abort_dag_on_dagreturnvalue = int(m.group("returnvalue")) 555 continue 556 # MAXJOBS ... 557 m = self.maxjobspat.search(line) 558 if m is not None: 559 if m.group("category") in self.maxjobs: 560 raise ValueError("line %d: multiple MAXJOBS for category %s" % (n, m.group("category"))) 561 self.maxjobs[m.group("category")] = int(m.group("value")) 562 continue 563 # DOT ... 564 m = self.dotpat.search(line) 565 if m is not None: 566 self.dot = m.group("filename") 567 options = (m.group("options") or "").split() 568 while options: 569 option = options.pop(0).upper() 570 if option == "UPDATE": 571 self.dotupdate = True 572 elif option == "DONT-UPDATE": 573 self.dotupdate = False 574 elif option == "OVERWRITE": 575 self.dotoverwrite = True 576 elif option == "DONT-OVERWRITE": 577 self.dotoverwrite = False 578 elif option == "INCLUDE": 579 try: 580 self.dotinclude = options.pop(0) 581 except IndexError: 582 raise ValueError("line %d: missing filename for INCLUDE option of DOT" % n) 583 else: 584 raise ValueError("unrecognized option %s for DOT" % option) 585 continue 586 # CONFIG ... 587 m = self.dotpat.search(line) 588 if m is not None: 589 if self.config is not None: 590 raise ValueError("line %d: multiple CONFIG lines in dag file" % n) 591 self.config = m.group("filename") 592 continue 593 # NODE_STATUS_FILE ... 594 m = self.nodestatuspat.search(line) 595 if m is not None: 596 if self.node_status_file is not None: 597 raise ValueError("line %d: multiple NODE_STATUS_FILE lines in dag file" % n) 598 self.node_status_file = m.group("filename") 599 if m.group(updatetime) is not None: 600 self.node_status_file_updatetime = int(m.group("updatetime")) 601 continue 602 # JOBSTATE_LOG ... 603 m = self.jobstatepat.search(line) 604 if m is not None: 605 # dagman allows more than one of these 606 # statements, ignoring all but the first 607 if self.jobstate_log is None: 608 self.jobstate_log = m.group("filename") 609 continue 610 # error 611 raise ValueError("line %d: invalid line in dag file: %s" % (n, line)) 612 # progress 613 del progress 614 # populate parent and child sets 615 for parent, child in arcs: 616 self.nodes[parent].children.add(self.nodes[child]) 617 self.nodes[child].parents.add(self.nodes[parent]) 618 # make sure all categories are known 619 for node in self.nodes.values(): 620 if node.category is not None and node.category not in self.maxjobs: 621 self.maxjobs[node.category] = None 622 # done 623 return self
624 625 @classmethod
626 - def select_nodes_by_name(cls, dag, nodenames):
627 """ 628 Construct a new DAG object containing only the nodes whose 629 names are in nodenames. 630 631 Example: 632 633 >>> names_to_rerun = set(["triggergen"]) 634 >>> dag = DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun)) 635 636 NOTE: the new DAG object is given references to the node 637 (JOB, DATA, etc.) objects in the original DAG, not copies 638 of them. Therefore, editing the node objects, for example 639 modifying their parent or child sets, will affect both 640 DAGs. To obtain an independent DAG with its own node 641 objects, make a deepcopy of the object that is returned 642 (see the copy module in the Python standard library for 643 more information). 644 645 Example: 646 647 >>> import copy 648 >>> dag = copy.deepcopy(DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun))) 649 """ 650 self = cls() 651 self.nodes = dict((name, node) for name, node in dag.nodes.items() if name in nodenames) 652 self.maxjobs = dict((category, dag.maxjobs[category]) for category in set(node.category for node in self.nodes.values() if node.category is not None)) 653 self.config = dag.config 654 self.node_status_file = dag.node_status_file 655 self.node_status_file_updatetime = dag.node_status_file_updatetime 656 self.jobstate_log = dag.jobstate_log 657 self.dot = dag.dot 658 self.dotupdate = dag.dotupdate 659 self.dotoverwrite = dag.dotoverwrite 660 self.dotinclude = dag.dotinclude 661 return self
662
663 - def get_all_parent_names(self, names):
664 """ 665 Trace the DAG backward from the parents of the nodes whose 666 names are given to the head nodes, inclusively, and return 667 the set of the names of all nodes visited. 668 669 Example: 670 671 >>> all_parents = dag.get_all_parent_names(["triggergen"]) 672 """ 673 all_parent_names = set() 674 nodes_to_scan = set(self.nodes[name] for name in names) 675 while nodes_to_scan: 676 node = nodes_to_scan.pop() 677 nodes_to_scan |= node.parents 678 all_parent_names |= set(parent.name for parent in node.parents) 679 return all_parent_names
680
681 - def get_all_child_names(self, names):
682 """ 683 Trace the DAG forward from the children of the nodes whose 684 names are given to the leaf nodes, inclusively, and return 685 the set of the names of all nodes visited. 686 687 Example: 688 689 >>> all_children = dag.get_all_child_names(["triggergen"]) 690 """ 691 all_child_names = set() 692 nodes_to_scan = set(self.nodes[name] for name in names) 693 while nodes_to_scan: 694 node = nodes_to_scan.pop() 695 nodes_to_scan |= node.children 696 all_child_names |= set(child.name for child in node.children) 697 return all_child_names
698
699 - def check_edges(self):
700 """ 701 Check all graph edges for validity. Checks that each of 702 every node's children lists that node as a parent, and vice 703 versa, and that all nodes listed in the parent and child 704 sets of all nodes are contained in this DAG. Raises 705 ValueError if a problem is found, otherwise returns None. 706 707 Example: 708 709 >>> try: 710 ... dag.check_edges() 711 ... except ValueError as e: 712 ... print "edges are broken: %s" % str(e) 713 ... else: 714 ... print "all edges are OK" 715 ... 716 """ 717 nodes = set(self.nodes.values()) 718 for node in nodes: 719 for child in node.children: 720 if node not in child.parents: 721 raise ValueError("node %s is not a parent of its child %s" % (node.name, child.name)) 722 if child not in nodes: 723 raise ValueError("node %s has child %s that is not in DAG" % (node.name, child.name)) 724 for parent in node.parents: 725 if node not in parent.children: 726 raise ValueError("node %s is not a child of its parent %s" % (node.name, parent.name)) 727 if parent not in nodes: 728 raise ValueError("node %s has parent %s that is not in DAG" % (node.name, parent.name))
729
730 - def load_rescue(self, f, progress = None):
731 """ 732 Parse the file-like object f as a rescue DAG, using the 733 DONE lines therein to set the job states of this DAG. 734 735 In the past, rescue DAGs were full copies of the original 736 DAG with the word DONE added to the JOB lines of completed 737 jobs. In version 7.7.2 of Condor, the default format of 738 rescue DAGs was changed to a condensed format consisting of 739 only the names of completed jobs and the number of retries 740 remaining for incomplete jobs. Currently Condor still 741 supports the original rescue DAG format, but the user must 742 set the DAGMAN_WRITE_PARTIAL_RESCUE config variable to 743 false to obtain one. This module does not directly support 744 the new format, however this method allows a new-style 745 rescue DAG to be parsed to set the states of the jobs in a 746 DAG. This, in effect, converts a new-style rescue DAG to 747 an old-style rescue DAG, allowing the result to be 748 manipulated as before. 749 750 If the progress argument is not None, it should be a 751 callable object. This object will be called periodically 752 and passed the f argument, the current line number, and a 753 boolean indicating if parsing is complete. The boolean is 754 always False until parsing is complete, then the callable 755 will be invoked one last time with the final line count and 756 the boolean set to True. 757 """ 758 # set all jobs to "not done" 759 for job in self.nodes.values(): 760 job.done = False 761 # now load rescue DAG, updating done and retries states 762 progress = progress_wrapper(f, progress) 763 for n, line in enumerate(f): 764 # lines are counted from 1, enumerate counts from 0 765 n += 1 766 # progress 767 progress += 1 768 # skip comments and blank lines 769 line = line.strip() 770 if not line or line.startswith("#"): 771 continue 772 # DONE ... 773 m = self.donepat.search(line) 774 if m is not None: 775 self.nodes[m.group("name")].done = True 776 continue 777 # RETRY ... 778 m = self.retrypat.search(line) 779 if m is not None: 780 node = self.nodes[m.group("name")] 781 node.retry = int(m.group("retries")) 782 node.retry_unless_exit_value = m.group("retry_unless_exit_value") 783 continue 784 # error 785 raise ValueError("line %d: invalid line in rescue file: %s" % (n, line)) 786 # progress 787 del progress
788
789 - def write(self, f, progress = None, rescue = None):
790 """ 791 Write the DAG to the file-like object f. The object must 792 provide a .write() method. In the special case that the 793 optional rescue argument is not None (see below) then f can 794 be set to None and no DAG file will be written (just the 795 rescue DAG will be written). 796 797 If the progress argument is not None, it should be a 798 callable object. This object will be called periodically 799 and passed the f argument, the current line number, and a 800 boolean indicating if writing is complete. The boolean is 801 always False until writing is complete, then the callable 802 will be invoked one last time with the final line count and 803 the boolean set to True. 804 805 Example: 806 807 >>> def progress(f, n, done): 808 ... print "writing %s: %d lines\\r" % (f.name, n), 809 ... if done: 810 ... print 811 ... 812 >>> dag.write(open("pipeline.dag", "w"), progress = progress) 813 814 NOTE: when writing PARENT/CHILD graph edges, this method 815 will silently skip any node names that are not in this 816 DAG's graph. This is a convenience to simplify writing 817 DAGs constructed by the .select_nodes_by_name() class 818 method. If one wishes to check for broken parent/child 819 links before writing the DAG use the .check_edges() method. 820 821 If the optional rescue argument is not None, it must be a 822 file-like object providing a .write() method and the DONE 823 state of jobs will be written to this file instead of the 824 .dag (in the .dag all jobs will be marked not done). 825 826 Example: 827 828 >>> dag.write(open("pipeline.dag", "w"), rescue = open("pipeline.dag.rescue001", "w")) 829 830 NOTE: it is left as an exercise for the calling code to 831 ensure the name chosen for the rescue file is consistent 832 with the naming convention assumed by condor_dagman when it 833 starts up. 834 """ 835 # initialize proegress report wrapper 836 progress = progress_wrapper(f, progress) 837 838 # if needed, create a dummy object to allow .write() method 839 # calls 840 if f is None and rescue is not None: 841 f = nofile() 842 843 # DOT ... 844 if self.dot is not None: 845 f.write("DOT %s" % self.dot) 846 if self.dotupdate: 847 f.write(" UPDATE") 848 if not self.dotoverwrite: 849 f.write(" DONT-OVERWRITE") 850 if self.dotinclude is not None: 851 f.write(" INCLUDE %s" % self.dotinclude) 852 f.write("\n") 853 progress += 1 854 855 # CONFIG ... 856 if self.config is not None: 857 f.write("CONFIG %s\n" % self.config) 858 progress += 1 859 860 # NODE_STATUS_FILE ... 861 if self.node_status_file is not None: 862 f.write("NODE_STATUS_FILE %s" % self.node_status_file) 863 if self.node_status_file_updatetime is not None: 864 f.write(" %d" % self.node_status_file_updatetime) 865 f.write("\n") 866 progress += 1 867 868 # JOBSTATE_LOG ... 869 if self.jobstate_log is not None: 870 f.write("JOBSTATE_LOG %s\n" % self.jobstate_log) 871 progress += 1 872 873 # MAXJOBS ... 874 if set(node.category for node in self.nodes.values() if node.category is not None) - set(self.maxjobs): 875 raise ValueError("no MAXJOBS statement(s) for node category(ies) %s" % ", ".join(sorted(set(node.category for node in self.nodes.values() if node.category is not None) - set(self.maxjobs)))) 876 for name, value in sorted(self.maxjobs.items()): 877 if value is not None: 878 f.write("MAXJOBS %s %d\n" % (name, value)) 879 progress += 1 880 881 # JOB/DATA/SUBDAG ... (and things that go with them) 882 for name, node in sorted(self.nodes.items()): 883 if rescue is not None: 884 if node.done: 885 rescue.write("DONE %s\n" % node.name) 886 # save done state, then clear 887 done = node.done 888 node.done = False 889 node.write(f, progress = progress) 890 if rescue is not None: 891 # restore done state 892 node.done = done 893 894 # PARENT ... CHILD ... 895 names = set(self.nodes) 896 parents_of = {} 897 for name, node in self.nodes.items(): 898 parents_of.setdefault(frozenset(child.name for child in node.children) & names, set()).add(node.name) 899 for children, parents in parents_of.items(): 900 if children: 901 f.write("PARENT %s CHILD %s\n" % (" ".join(sorted(parents)), " ".join(sorted(children)))) 902 progress += 1 903 904 # progress 905 del progress
906
907 - def dot_source(self, title = "DAG", rename = False, colour = "black", bgcolour = "#a3a3a3", statecolours = {'wait': 'yellow', 'idle': 'yellow', 'run': 'lightblue', 'abort': 'red', 'stop': 'red', 'success': 'green', 'fail': 'red'}):
908 """ 909 Return a string containing DOT code to generate a 910 visualization of the DAG graph. See 911 http://www.graphviz.org for more information. 912 913 title provides a title for the graph. If rename is True, 914 instead of using the names of the nodes for the node names 915 in the graph, numbers will be used instead. The numbers 916 are assigned to the nodes in alphabetical order by node 917 name. This might be required if the nodes have names that 918 are incompatible with the DOT syntax. 919 920 colour and bgcolour set the outline colour of the graph 921 nodes and the background colour for the graph respectively. 922 statecolours is a dictionary mapping node state (see the 923 .state attribute of the JOB class and its derivatives) to a 924 colour. Set statecolours to None to disable state-based 925 colouring of graph nodes. 926 927 Example: 928 929 >>> print(dag.dot_source(statecolours = None)) 930 931 BUGS: the JOB class does not implement the ability to 932 retrieve the job state at this time, therefore it is always 933 necessary to set statecolours to None. This might change 934 in the future. 935 """ 936 # set up renaming map 937 938 if rename: 939 namemap = dict((name, str(n)) for n, name in enumerate(sorted(self.nodes), start = 1)) 940 else: 941 namemap = dict((name, name) for name in self.nodes) 942 943 # generate dot code 944 945 code = 'digraph "%s" {\nnode [color="%s", href="\\N"];\ngraph [bgcolor="%s"];\n' % (title, colour, bgcolour) 946 for node in self.nodes.values(): 947 if statecolours is not None: 948 code += '"%s"[color="%s"];\n' % (namemap[node.name], statecolours[node.state]) 949 for child in node.children: 950 code += '"%s" -> "%s";\n' % (namemap[node.name], namemap[child.name]) 951 code += '}\n' 952 953 # done 954 955 return code
956