Package glue :: Module workflow
[hide private]
[frames] | no frames]

Source Code for Module glue.workflow

  1  # Copyright (C) 2014  Alex Nitz 
  2  # 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify it 
  5  # under the terms of the GNU General Public License as published by the 
  6  # Free Software Foundation; either version 3 of the License, or (at your 
  7  # option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, but 
 10  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
 12  # Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License along 
 15  # with this program; if not, write to the Free Software Foundation, Inc., 
 16  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
 17   
 18   
 19  # 
 20  # ============================================================================= 
 21  # 
 22  #                                   Preamble 
 23  # 
 24  # ============================================================================= 
 25  # 
 26  """ This module provides thin wrappers around Pegasus.DAX3 functionality that 
 27  provides additional abstraction and argument handling. 
 28  """ 
 29  import Pegasus.DAX3 as dax 
 30   
31 -class ProfileShortcuts(object):
32 """ Container of common methods for setting pegasus profile information 33 on Executables and nodes. This class expects to be inherited from 34 and for a add_profile method to be implemented. 35 """
36 - def set_memory(self, size):
37 """ Set the amount of memory that is required in megabytes 38 """ 39 self.add_profile('condor', 'request_memory', '%sM' % size)
40
41 - def set_storage(self, size):
42 """ Set the amount of storage required in megabytes 43 """ 44 self.add_profile('condor', 'request_disk', '%sM' % size)
45
46 - def set_num_cpus(self, number):
47 self.add_profile('condor', 'request_cpus', number)
48
49 - def set_universe(self, universe):
50 self.add_profile("condor", "universe", universe)
51
52 - def set_category(self, category):
53 self.add_profile('dagman', 'category', category)
54
55 - def set_priority(self, priority):
56 self.add_profile('dagman', 'priority', priority)
57
58 - def set_num_retries(self, number):
59 self.add_profile("dagman", "retry", number)
60
61 -class Executable(ProfileShortcuts):
62 """ The workflow representation of an Executable 63 """ 64 id = 0
65 - def __init__(self, name, namespace=None, os='linux', 66 arch='x86_64', installed=True, version=None):
67 self.name = name 68 self.logical_name = self.name + "_ID%s" % str(Executable.id) 69 Executable.id += 1 70 self.namespace = namespace 71 self.version = version 72 self._dax_executable = dax.Executable(self.logical_name, 73 namespace=self.namespace, version=version, os=os, 74 arch=arch, installed=installed) 75 self.in_workflow = False 76 self.pfns = {}
77
78 - def add_pfn(self, url, site='local'):
79 self._dax_executable.PFN(url, site) 80 self.pfns[site] = url
81
82 - def get_pfn(self, site='local'):
83 return self.pfns[site]
84
85 - def insert_into_dax(self, dax):
86 dax.addExecutable(self._dax_executable)
87
88 - def add_profile(self, namespace, key, value):
89 """ Add profile information to this executable 90 """ 91 entry = dax.Profile(namespace, key, value) 92 self._dax_executable.addProfile(entry)
93
94 -class Node(ProfileShortcuts):
95 - def __init__(self, executable):
96 self.in_workflow = False 97 self.executable=executable 98 self._inputs = [] 99 self._outputs = [] 100 self._dax_node = dax.Job(name=executable.logical_name, 101 version = executable.version, 102 namespace=executable.namespace) 103 self._args = [] 104 self._options = []
105
106 - def add_arg(self, arg):
107 """ Add an argument 108 """ 109 if not isinstance(arg, File): 110 arg = str(arg) 111 112 self._args += [arg]
113
114 - def add_opt(self, opt, value=None):
115 """ Add a option 116 """ 117 if value: 118 if not isinstance(value, File): 119 value = str(value) 120 self._options += [opt, value] 121 else: 122 self._options += [opt]
123 124 #private functions to add input and output data sources/sinks
125 - def _add_input(self, inp):
126 """ Add as source of input data 127 """ 128 self._inputs += [inp] 129 inp._set_as_input_of(self)
130
131 - def _add_output(self, out):
132 """ Add as destination of output data 133 """ 134 self._outputs += [out] 135 out.node = self 136 out._set_as_output_of(self)
137 138 # public functions to add options, arguments with or without data sources
139 - def add_input_opt(self, opt, inp):
140 """ Add an option that determines an input 141 """ 142 self.add_opt(opt, inp._dax_repr()) 143 self._add_input(inp)
144
145 - def add_output_opt(self, opt, out):
146 """ Add an option that determines an output 147 """ 148 self.add_opt(opt, out._dax_repr()) 149 self._add_output(out)
150
151 - def add_output_list_opt(self, opt, outputs):
152 """ Add an option that determines a list of outputs 153 """ 154 self.add_opt(opt) 155 for out in outputs: 156 self.add_opt(out) 157 self._add_output(out)
158
159 - def add_input_list_opt(self, opt, inputs):
160 """ Add an option that determines a list of inputs 161 """ 162 self.add_opt(opt) 163 for inp in inputs: 164 self.add_opt(inp) 165 self._add_input(inp)
166
167 - def add_input_arg(self, inp):
168 """ Add an input as an argument 169 """ 170 self.add_arg(inp._dax_repr()) 171 self._add_input(inp)
172
173 - def add_output_arg(self, out):
174 """ Add an output as an argument 175 """ 176 self.add_arg(out._dax_repr()) 177 self._add_output(out)
178
179 - def new_output_file_opt(self, opt, name):
180 """ Add an option and return a new file handle 181 """ 182 fil = File(name) 183 self.add_output_opt(opt, fil) 184 return fil
185 186 # functions to describe properties of this node
187 - def add_profile(self, namespace, key, value):
188 """ Add profile information to this node at the DAX level 189 """ 190 entry = dax.Profile(namespace, key, value) 191 self._dax_node.addProfile(entry)
192
193 - def _finalize(self):
194 args = self._args + self._options 195 self._dax_node.addArguments(*args)
196
197 -class Workflow(object):
198 """ 199 """
200 - def __init__(self, name='my_workflow'):
201 self.name = name 202 self._adag = dax.ADAG(name) 203 204 self._inputs = [] 205 self._outputs = [] 206 self._executables = []
207
208 - def add_node(self, node):
209 """ Add a node to this workflow 210 211 This function adds nodes to the workflow. It also determines 212 parent/child relations from the DataStorage inputs to this job. 213 214 Parameters 215 ---------- 216 node : Node 217 A node that should be exectuded as part of this workflow. 218 """ 219 node._finalize() 220 node.in_workflow = True 221 self._adag.addJob(node._dax_node) 222 223 # Determine the parent child relationships based on the inputs that 224 # this node requires. 225 for inp in node._inputs: 226 if inp.node is not None and inp.node.in_workflow: 227 parent = inp.node._dax_node 228 child = node._dax_node 229 dep = dax.Dependency(parent=parent, child=child) 230 self._adag.addDependency(dep) 231 232 elif inp.node is not None and not inp.node.in_workflow: 233 raise ValueError('Parents of this node must be added to the ' 234 'workflow first.') 235 236 elif inp.node is None and inp.workflow_input is False: 237 self._inputs += [inp] 238 inp.workflow_input = True 239 240 # Record the outputs that this node generates 241 self._outputs += node._outputs 242 243 # Record the executable that this node uses 244 if not node.executable.in_workflow: 245 node.executable.in_workflow = True 246 self._executables += [node.executable] 247 248 return self
249 250 __iadd__ = add_node 251
252 - def save(self, filename):
253 """ Write this workflow to DAX file 254 """ 255 f = open(filename, "w") 256 self._adag.writeXML(f)
257
258 -class DataStorage(object):
259 """ A workflow representation of a place to store and read data from. 260 261 The abstract representation of a place to store and read data from. This 262 can include files, database, or remote connections. This object is 263 used as a handle to pass between functions, and is used a way to logically 264 represent the order operation on the physical data. 265 """
266 - def __init__(self, name):
267 self.name = name 268 self.node = None 269 self.workflow_input = False
270
271 - def _set_as_node_input(self):
272 pass
273
274 - def _set_as_node_output(self):
275 pass
276
277 - def _dax_repr(self):
278 return self.name
279
280 -class File(DataStorage, dax.File):
281 """ The workflow representation of a physical file 282 283 An object that represents a file from the perspective of setting up a 284 workflow. The file may or may not exist at the time of workflow generation. 285 If it does, this is represented by containing a physical file name (PFN). 286 A storage path is also available to indicate the desired final 287 destination of this file. 288 """
289 - def __init__(self, name):
290 DataStorage.__init__(self, name) 291 dax.File.__init__(self, name) 292 self.storage_path = None
293
294 - def _dax_repr(self):
295 return self
296
297 - def _set_as_input_of(self, node):
298 node._dax_node.uses(self, link=dax.Link.INPUT, register=False, 299 transfer=True)
300 - def _set_as_output_of(self, node):
301 node._dax_node.uses(self, link=dax.Link.OUTPUT, register=False, 302 transfer=True)
303 - def output_map_str(self):
304 return '%s %s pool="%s"' % (self.name, self.storage_path, 'local')
305
306 - def insert_into_dax(self, dax):
307 dax.addFile(self)
308
309 -class Database(DataStorage):
310 pass
311