gstlal-inspiral  0.4.2
 All Classes Namespaces Files Functions Variables Pages
inspiral_pipe.py
Go to the documentation of this file.
1 # Copyright (C) 2013--2014 Kipp Cannon, Chad Hanna
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License as published by the
5 # Free Software Foundation; either version 2 of the License, or (at your
6 # option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
11 # Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 
17 ##
18 # @file
19 #
20 # A file that contains the inspiral_pipe module code; used to construct condor dags
21 #
22 
23 ##
24 # @package inspiral_pipe
25 #
26 # A module that contains the inspiral_pipe module code; used to construct condor dags
27 #
28 # ### Review Status
29 #
30 # | Names | Hash | Date |
31 # | ------------------------------------------- | ------------------------------------------- | ---------- |
32 # | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 8a6ea41398be79c00bdc27456ddeb1b590b0f68e | 2014-06-18 |
33 #
34 # #### Actions
35 #
36 # - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments
37 # - On line 201, fix the comment or explain what the comment is meant to be
38 
39 import sys, os
40 import subprocess, socket, tempfile, copy, doctest
41 from glue import pipeline, lal
42 from glue.ligolw import utils, lsctables, array
43 
44 
45 #
46 # environment utilities
47 #
48 
49 
50 def which(prog):
51  """!
52  Like the which program to find the path to an executable
53 
54  >>> which("ls")
55  '/bin/ls'
56 
57  """
58  which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE)
59  out = which.stdout.read().strip()
60  if not out:
61  print >>sys.stderr, "ERROR: could not find %s in your path, have you built the proper software and source the proper env. scripts?" % (prog,prog)
62  raise ValueError
63  return out
64 
65 
67  """!
68  A way to standardize the condor scratch space even if it changes
69  >>> condor_scratch_space()
70  '_CONDOR_SCRATCH_DIR'
71  """
72  return "_CONDOR_SCRATCH_DIR"
73 
74 
75 def log_path():
76  """!
77  The stupid pet tricks to find log space on the LDG.
78  Defaults to checking TMPDIR first.
79  """
80  host = socket.getfqdn()
81  try:
82  return os.environ['TMPDIR']
83  except KeyError:
84  print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n"
85  #FIXME add more hosts as you need them
86  if 'cit' in host or 'caltech.edu' in host:
87  tmp = '/usr1/' + os.environ['USER']
88  print "falling back to ", tmp
89  return tmp
90  if 'phys.uwm.edu' in host:
91  tmp = '/localscratch/' + os.environ['USER']
92  print "falling back to ", tmp
93  return tmp
94  if 'aei.uni-hannover.de' in host:
95  tmp = '/local/user/' + os.environ['USER']
96  print "falling back to ", tmp
97  return tmp
98  if 'phy.syr.edu' in host:
99  tmp = '/usr1/' + os.environ['USER']
100  print "falling back to ", tmp
101  return tmp
102 
103  raise KeyError("$TMPDIR is not set and I don't recognize this environment")
104 
105 
106 #
107 # DAG class
108 #
109 
110 
111 class DAG(pipeline.CondorDAG):
112  """!
113  A thin subclass of pipeline.CondorDAG.
114 
115  Extra features include an add_node() method and a cache writing method.
116  Also includes some standard setup, e.g., log file paths etc.
117  """
118  def __init__(self, name, logpath = log_path()):
119  self.basename = name.replace(".dag","")
120  tempfile.tempdir = logpath
121  tempfile.template = self.basename + '.dag.log.'
122  logfile = tempfile.mktemp()
123  fh = open( logfile, "w" )
124  fh.close()
125  pipeline.CondorDAG.__init__(self,logfile)
126  self.set_dag_file(self.basename)
127  self.jobsDict = {}
128  self.output_cache = []
129 
130  def add_node(self, node):
131  node.set_retry(3)
132  node.add_macro("macronodename", node.get_name())
133  pipeline.CondorDAG.add_node(self, node)
134 
135  def write_cache(self):
136  out = self.basename + ".cache"
137  f = open(out,"w")
138  for c in self.output_cache:
139  f.write(str(c)+"\n")
140  f.close()
141 
142 
143 class InspiralJob(pipeline.CondorDAGJob):
144  """!
145  A job class that subclasses pipeline.CondorDAGJob and adds some extra
146  boiler plate items for gstlal inspiral jobs
147  """
148  def __init__(self, executable, tag_base, universe = "vanilla"):
149  self.__prog__ = tag_base
150  self.__executable = executable
151  self.__universe = universe
152  pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
153  self.add_condor_cmd('getenv','True')
154  self.add_condor_cmd('environment',"GST_REGISTRY_UPDATE=no;")
155  self.tag_base = tag_base
156  self.set_sub_file(tag_base+'.sub')
157  self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out')
158  self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err')
159  self.number = 1
160  # make an output directory for files
161  self.output_path = tag_base
162  try:
163  os.mkdir(self.output_path)
164  except:
165  pass
166 
167 
168 class InspiralNode(pipeline.CondorDAGNode):
169  """!
170  A node class that subclasses pipeline.CondorDAGNode that automates
171  adding the node to the dag, makes sensible names and allows a list of parent
172  nodes to be provided.
173  """
174  def __init__(self, job, dag, p_node=[]):
175  pipeline.CondorDAGNode.__init__(self, job)
176  for p in p_node:
177  self.add_parent(p)
178  self.set_name("%s_%04X" % (job.tag_base, job.number))
179  job.number += 1
180  dag.add_node(self)
181 
182 
184  """!
185  A generic job class which tends to do the "right" thing when given just
186  an executable name but otherwise is a subclass of InspiralJob and thus
187  pipeline.CondorDAGJob
188  """
189  def __init__(self, program, tag_base = None, condor_commands = {}, **kwargs):
190  executable = which(program)
191  InspiralJob.__init__(self, executable, tag_base or os.path.split(executable)[1], **kwargs)
192  for cmd,val in condor_commands.items():
193  self.add_condor_cmd(cmd, val)
194 
195 
197  """!
198  A generic node class which tends to do the "right" thing when given a
199  job, a dag, parent nodes, a dictionary options relevant to the job, a
200  dictionary of options related to input files and a dictionary of options
201  related to output files. Otherwise it is a subclass of InspiralNode and thus
202  pipeline.CondorDAGNode
203 
204  NOTE and important and subtle behavior - You can specify an option with
205  an empty argument by setting it to "". However options set to None are simply
206  ignored.
207  """
208  def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}):
209  InspiralNode.__init__(self, job, dag, parent_nodes)
210 
211  self.input_files = input_files
212  self.output_files = output_files
213 
214  for opt, val in opts.items() + output_files.items() + input_files.items():
215  if val is None:
216  continue # not the same as val = '' which is allowed
217  if not hasattr(val, "__iter__"): # catches list like things but not strings
218  if opt == "":
219  self.add_var_arg(val)
220  else:
221  self.add_var_opt(opt, val)
222  # Must be an iterable
223  else:
224  if opt == "":
225  [self.add_var_arg(a) for a in val]
226  else:
227  self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))
228 
229 
231  """!
232  A way to work around the dictionary nature of pipeline.py which can
233  only record options once.
234 
235  >>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3])
236  '1 --my-favorite-option 2 --my-favorite-option 3'
237  """
238  out = str(vals[0])
239  for v in vals[1:]:
240  out += " --%s %s" % (opt, str(v))
241  return out
242 
243 
244 
245 #
246 # Utility functions
247 #
248 
249 
250 def group(inlist, parts):
251  """!
252  group a list roughly according to the distribution in parts, e.g.
253 
254  >>> A = range(12)
255  >>> B = [2,3]
256  >>> for g in group(A,B):
257  ... print g
258  ...
259  [0, 1]
260  [2, 3]
261  [4, 5]
262  [6, 7, 8]
263  [9, 10, 11]
264  """
265  mult_factor = len(inlist) // sum(parts) + 1
266  l = copy.deepcopy(inlist)
267  for i, p in enumerate(parts):
268  for j in range(mult_factor):
269  if not l:
270  break
271  yield l[:p]
272  del l[:p]
273 
274 
275 def parse_cache_str(instr):
276  """!
277  A way to decode a command line option that specifies different bank
278  caches for different detectors, e.g.,
279 
280  >>> bankcache = parse_cache_str("H1=H1_split_bank.cache,L1=L1_split_bank.cache,V1=V1_split_bank.cache")
281  >>> bankcache
282  {'V1': 'V1_split_bank.cache', 'H1': 'H1_split_bank.cache', 'L1': 'L1_split_bank.cache'}
283  """
284 
285  dictcache = {}
286  if instr is None: return dictcache
287  for c in instr.split(','):
288  ifo = c.split("=")[0]
289  cache = c.replace(ifo+"=","")
290  dictcache[ifo] = cache
291  return dictcache
292 
293 
294 def build_bank_groups(cachedict, numbanks = [2], maxjobs = None):
295  """!
296  given a dictionary of bank cache files keyed by ifo from .e.g.,
297  parse_cache_str(), group the banks into suitable size chunks for a single svd
298  bank file according to numbanks. Note, numbanks can be should be a list and uses
299  the algorithm in the group() function
300  """
301  outstrs = []
302  ifos = sorted(cachedict.keys())
303  files = zip(*[[lal.CacheEntry(f).path for f in open(cachedict[ifo],'r').readlines()] for ifo in ifos])
304  for n, bank_group in enumerate(group(files, numbanks)):
305  if maxjobs is not None and n > maxjobs:
306  break
307  c = dict(zip(ifos, zip(*bank_group)))
308  outstrs.append(c)
309 
310  return outstrs
311 
312 
313 def T050017_filename(instruments, description, start, end, extension, path = None):
314  """!
315  A function to generate a T050017 filename.
316  """
317  if type(instruments) != type(str()):
318  instruments = "".join(sorted(instruments))
319  duration = end - start
320  extension = extension.strip('.')
321  if path is not None:
322  return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension)
323  else:
324  return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension)
325 
326 
327 if __name__ == "__main__":
328  import doctest
329  doctest.testmod()