40 import subprocess, socket, tempfile, copy, doctest
41 from glue
import pipeline, lal
42 from glue.ligolw
import utils, lsctables, array
52 Like the which program to find the path to an executable
58 which = subprocess.Popen([
'which',prog], stdout=subprocess.PIPE)
59 out = which.stdout.read().strip()
61 print >>sys.stderr,
"ERROR: could not find %s in your path, have you built the proper software and source the proper env. scripts?" % (prog,prog)
68 A way to standardize the condor scratch space even if it changes
69 >>> condor_scratch_space()
72 return "_CONDOR_SCRATCH_DIR"
77 The stupid pet tricks to find log space on the LDG.
78 Defaults to checking TMPDIR first.
80 host = socket.getfqdn()
82 return os.environ[
'TMPDIR']
84 print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n"
86 if 'cit' in host
or 'caltech.edu' in host:
87 tmp =
'/usr1/' + os.environ[
'USER']
88 print "falling back to ", tmp
90 if 'phys.uwm.edu' in host:
91 tmp =
'/localscratch/' + os.environ[
'USER']
92 print "falling back to ", tmp
94 if 'aei.uni-hannover.de' in host:
95 tmp =
'/local/user/' + os.environ[
'USER']
96 print "falling back to ", tmp
98 if 'phy.syr.edu' in host:
99 tmp =
'/usr1/' + os.environ[
'USER']
100 print "falling back to ", tmp
103 raise KeyError(
"$TMPDIR is not set and I don't recognize this environment")
111 class DAG(pipeline.CondorDAG):
113 A thin subclass of pipeline.CondorDAG.
115 Extra features include an add_node() method and a cache writing method.
116 Also includes some standard setup, e.g., log file paths etc.
118 def __init__(self, name, logpath = log_path()):
119 self.
basename = name.replace(
".dag",
"")
120 tempfile.tempdir = logpath
121 tempfile.template = self.
basename +
'.dag.log.'
122 logfile = tempfile.mktemp()
123 fh = open( logfile,
"w" )
125 pipeline.CondorDAG.__init__(self,logfile)
130 def add_node(self, node):
132 node.add_macro(
"macronodename", node.get_name())
133 pipeline.CondorDAG.add_node(self, node)
135 def write_cache(self):
145 A job class that subclasses pipeline.CondorDAGJob and adds some extra
146 boiler plate items for gstlal inspiral jobs
148 def __init__(self, executable, tag_base, universe = "vanilla"):
153 self.add_condor_cmd(
'getenv',
'True')
154 self.add_condor_cmd(
'environment',
"GST_REGISTRY_UPDATE=no;")
156 self.set_sub_file(tag_base+
'.sub')
157 self.set_stdout_file(
'logs/$(macronodename)-$(cluster)-$(process).out')
158 self.set_stderr_file(
'logs/$(macronodename)-$(cluster)-$(process).err')
170 A node class that subclasses pipeline.CondorDAGNode that automates
171 adding the node to the dag, makes sensible names and allows a list of parent
172 nodes to be provided.
174 def __init__(self, job, dag, p_node=[]):
175 pipeline.CondorDAGNode.__init__(self, job)
178 self.set_name(
"%s_%04X" % (job.tag_base, job.number))
185 A generic job class which tends to do the "right" thing when given just
186 an executable name but otherwise is a subclass of InspiralJob and thus
187 pipeline.CondorDAGJob
189 def __init__(self, program, tag_base = None, condor_commands = {}, **kwargs):
190 executable =
which(program)
191 InspiralJob.__init__(self, executable, tag_base
or os.path.split(executable)[1], **kwargs)
192 for cmd,val
in condor_commands.items():
193 self.add_condor_cmd(cmd, val)
198 A generic node class which tends to do the "right" thing when given a
199 job, a dag, parent nodes, a dictionary options relevant to the job, a
200 dictionary of options related to input files and a dictionary of options
201 related to output files. Otherwise it is a subclass of InspiralNode and thus
202 pipeline.CondorDAGNode
204 NOTE and important and subtle behavior - You can specify an option with
205 an empty argument by setting it to "". However options set to None are simply
208 def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}):
209 InspiralNode.__init__(self, job, dag, parent_nodes)
214 for opt, val
in opts.items() + output_files.items() + input_files.items():
217 if not hasattr(val,
"__iter__"):
219 self.add_var_arg(val)
221 self.add_var_opt(opt, val)
225 [self.add_var_arg(a)
for a
in val]
232 A way to work around the dictionary nature of pipeline.py which can
233 only record options once.
235 >>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3])
236 '1 --my-favorite-option 2 --my-favorite-option 3'
240 out +=
" --%s %s" % (opt, str(v))
252 group a list roughly according to the distribution in parts, e.g.
256 >>> for g in group(A,B):
265 mult_factor = len(inlist) // sum(parts) + 1
266 l = copy.deepcopy(inlist)
267 for i, p
in enumerate(parts):
268 for j
in range(mult_factor):
277 A way to decode a command line option that specifies different bank
278 caches for different detectors, e.g.,
280 >>> bankcache = parse_cache_str("H1=H1_split_bank.cache,L1=L1_split_bank.cache,V1=V1_split_bank.cache")
282 {'V1': 'V1_split_bank.cache', 'H1': 'H1_split_bank.cache', 'L1': 'L1_split_bank.cache'}
286 if instr
is None:
return dictcache
287 for c
in instr.split(
','):
288 ifo = c.split(
"=")[0]
289 cache = c.replace(ifo+
"=",
"")
290 dictcache[ifo] = cache
296 given a dictionary of bank cache files keyed by ifo from .e.g.,
297 parse_cache_str(), group the banks into suitable size chunks for a single svd
298 bank file according to numbanks. Note, numbanks can be should be a list and uses
299 the algorithm in the group() function
302 ifos = sorted(cachedict.keys())
303 files = zip(*[[lal.CacheEntry(f).path
for f
in open(cachedict[ifo],
'r').readlines()] for ifo in ifos])
304 for n, bank_group
in enumerate(
group(files, numbanks)):
305 if maxjobs
is not None and n > maxjobs:
307 c = dict(zip(ifos, zip(*bank_group)))
315 A function to generate a T050017 filename.
317 if type(instruments) != type(str()):
318 instruments =
"".join(sorted(instruments))
319 duration = end - start
320 extension = extension.strip(
'.')
322 return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension)
324 return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension)
327 if __name__ ==
"__main__":