gstlal  0.8.1
 All Classes Namespaces Files Functions Variables Pages
dagparts.py
1 # Copyright (C) 2010 Kipp Cannon (kipp.cannon@ligo.org)
2 # Copyright (C) 2010 Chad Hanna (chad.hanna@ligo.org)
3 #
4 # This program is free software; you can redistribute it and/or modify it under
5 # the terms of the GNU General Public License as published by the Free Software
6 # Foundation; either version 2 of the License, or (at your option) any later
7 # version.
8 #
9 # This program is distributed in the hope that it will be useful, but WITHOUT
10 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12 # details.
13 #
14 # You should have received a copy of the GNU General Public License along with
15 # this program; if not, write to the Free Software Foundation, Inc., 51
16 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 
18 
19 #
20 # =============================================================================
21 #
22 # Preamble
23 #
24 # =============================================================================
25 #
26 
27 
28 """
29 DAG construction tools.
30 """
31 
32 
33 import os
34 import sys
35 import socket
36 import subprocess
37 import tempfile
38 import math
39 
40 from glue import segments
41 from glue import pipeline
42 
43 
44 __author__ = "Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>"
45 __date__ = "$Date$" #FIXME
46 __version__ = "$Revision$" #FIXME
47 
48 
49 #
50 # =============================================================================
51 #
52 # Environment utilities
53 #
54 # =============================================================================
55 #
56 
57 
58 def which(prog):
59  which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE)
60  out = which.stdout.read().strip()
61  if not out:
62  print >>sys.stderr, "ERROR: could not find %s in your path, have you built the proper software and sourced the proper environment scripts?" % (prog,prog)
63  raise ValueError
64  return out
65 
66 
67 def log_path():
68  host = socket.getfqdn()
69  try:
70  return os.environ['TMPDIR']
71  except KeyError:
72  print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n"
73  #FIXME add more hosts as you need them
74  if 'cit' in host or 'caltech.edu' in host:
75  tmp = '/usr1/' + os.environ['USER']
76  print "falling back to ", tmp
77  return tmp
78  if 'phys.uwm.edu' in host:
79  tmp = '/localscratch/' + os.environ['USER']
80  print "falling back to ", tmp
81  return tmp
82  if 'aei.uni-hannover.de' in host:
83  tmp = '/local/user/' + os.environ['USER']
84  print "falling back to ", tmp
85  return tmp
86  if 'phy.syr.edu' in host:
87  tmp = '/usr1/' + os.environ['USER']
88  print "falling back to ", tmp
89  return tmp
90 
91  raise KeyError("$TMPDIR is not set and I don't recognize this environment")
92 
93 
94 #
95 # =============================================================================
96 #
97 # Condor DAG utilities
98 #
99 # =============================================================================
100 #
101 
102 
103 class CondorDAG(pipeline.CondorDAG):
104 
105  def __init__(self, name, logpath = log_path()):
106  self.basename = name
107  fh, logfile = tempfile.mkstemp(dir = log_path(), prefix = self.basename + '.dag.log.')
108  os.close(fh)
109  pipeline.CondorDAG.__init__(self,logfile)
110  self.set_dag_file(self.basename)
111  self.jobsDict = {}
112  self.node_id = 0
113  self.output_cache = []
114 
115  def add_node(self, node, retry = 0):
116  node.set_retry(retry)
117  self.node_id += 1
118  node.add_macro("macroid", self.node_id)
119  node.add_macro("macronodename", node.get_name())
120  pipeline.CondorDAG.add_node(self, node)
121 
122  def write_cache(self):
123  out = self.basename + ".cache"
124  f = open(out,"w")
125  for c in self.output_cache:
126  f.write(str(c)+"\n")
127  f.close()
128 
129 
130 class CondorDAGJob(pipeline.CondorDAGJob):
131  """
132  A generic job class for gstlal stuff
133  """
134  def __init__(self, executable, tag_base):
135  self.__prog__ = tag_base
136  self.__executable = executable
137  self.__universe = 'vanilla'
138  pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
139  self.add_condor_cmd('getenv','True')
140  self.tag_base = tag_base
141  self.set_sub_file(tag_base+'.sub')
142  self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(macronodename)-$(cluster)-$(process).out')
143  self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(macronodename)-$(cluster)-$(process).err')
144  self.number = 1
145 
146 
147 class CondorDAGNode(pipeline.CondorDAGNode):
148  """
149  A generic node class for gstlal stuff
150  """
151  def __init__(self, job, dag, p_node=[]):
152  pipeline.CondorDAGNode.__init__(self, job)
153  for p in p_node:
154  self.add_parent(p)
155  dag.add_node(self)
156 
157 
158 #
159 # =============================================================================
160 #
161 # Segment utilities
162 #
163 # =============================================================================
164 #
165 
166 
167 def breakupseg(seg, maxextent, overlap):
168  if maxextent <= 0:
169  raise ValueError, "maxextent must be positive, not %s" % repr(maxextent)
170 
171  # Simple case of only one segment
172  if abs(seg) < maxextent:
173  return segments.segmentlist([seg])
174 
175  # adjust maxextent so that segments are divided roughly equally
176  maxextent = max(int(abs(seg) / (int(abs(seg)) // int(maxextent) + 1)), overlap)
177  maxextent = int(math.ceil(abs(seg) / math.ceil(abs(seg) / maxextent)))
178  end = seg[1]
179 
180  seglist = segments.segmentlist()
181 
182 
183  while abs(seg):
184  if (seg[0] + maxextent + overlap) < end:
185  seglist.append(segments.segment(seg[0], seg[0] + maxextent + overlap))
186  seg = segments.segment(seglist[-1][1] - overlap, seg[1])
187  else:
188  seglist.append(segments.segment(seg[0], end))
189  break
190 
191  return seglist
192 
193 
194 def breakupsegs(seglist, maxextent, overlap):
195  newseglist = segments.segmentlist()
196  for bigseg in seglist:
197  newseglist.extend(breakupseg(bigseg, maxextent, overlap))
198  return newseglist
199 
200 
201 def breakupseglists(seglists, maxextent, overlap):
202  for instrument, seglist in seglists.iteritems():
203  newseglist = segments.segmentlist()
204  for bigseg in seglist:
205  newseglist.extend(breakupseg(bigseg, maxextent, overlap))
206  seglists[instrument] = newseglist