1
2
3 import os
4 import sys
5 import tempfile
6 from glue import pipeline
7 import tempfile
8
10 command_string = node.job().get_executable() + " " + node.get_cmd_line()
11 return [opt for opt in command_string.split(" ") if opt.strip()]
12
13
16 self.basename = basename
17 tempfile.tempdir = log_path
18 tempfile.template = self.basename + '.dag.log.'
19 logfile = tempfile.mktemp()
20 fh = open(logfile, "w" )
21 fh.close()
22 pipeline.CondorDAG.__init__(self,logfile)
23 self.set_dag_file(self.basename)
24 self.jobsDict = {}
25
27
28 pipeline.CondorDAG.add_node(self, node)
29
30
31
32
33
34
36 """
37 A basic auxmvc job class. Sets common atributes needed for any auxmvc job. It uses config parser object to
38 set the options.
39 """
40 - def __init__(self,cp,sections,exec_name,tag_base='', id ='',extension='',dax=False, short_opts=False):
41 """
42 cp = ConfigParser object from which options are read.
43 sections = sections of the ConfigParser that get added to the opts
44 exec_name = exec_name name in ConfigParser
45 """
46 self.__exec_name = exec_name
47 self.__extension = extension
48 self.tag_base = tag_base
49 universe = cp.get('condor','universe')
50 executable = cp.get('condor',exec_name)
51 pipeline.CondorDAGJob.__init__(self,universe,executable)
52 pipeline.AnalysisJob.__init__(self,cp,dax)
53 self.add_condor_cmd('copy_to_spool','False')
54 self.add_condor_cmd('getenv','True')
55 self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
56 self.__use_gpus = cp.has_option('condor', 'use-gpus')
57 for sec in sections:
58 if cp.has_section(sec):
59 if short_opts:
60 self.add_short_ini_opts(cp,sec)
61 else:
62 self.add_ini_opts(cp, sec)
63 else:
64 print >>sys.stderr, "warning: config file is missing section [" + sec + "]"
65
66 self.set_stdout_file('logs/' + tag_base + id + '.out')
67 self.set_stderr_file('logs/' + tag_base + id + '.err')
68 self.set_sub_file(tag_base + '.sub')
69
71 """
72 Set the exec_name name
73 """
74 self.__exec_name = exec_name
75
77 """
78 Set the exec_name name
79 """
80 self.__exec_name = exec_name
81
83 """
84 Get the exec_name name
85 """
86 return self.__exec_name
87
89 """
90 Set the file extension
91 """
92 self.__extension = extension
93
95 """
96 Get the extension for the file name
97 """
98 return self.__extension
99
101 """
102 Get whether this job was requested to run on a GPU node
103 """
104 return self.__use_gpus
105
107 """
108 Parse command line options from a given section in an ini file and
109 pass to the executable as short options.
110 @param cp: ConfigParser object pointing to the ini file.
111 @param section: section of the ini file to add to the options.
112 """
113 for opt in cp.options(section):
114 arg = cp.get(section,opt)
115 self.add_short_opt(opt,arg)
116
117
118
120 """
121 Job for building auxmvc feature vectors.
122 """
123 - def __init__(self, cp, main_channel, channels=None, unsafe_channels=None):
124 """
125 """
126 sections = ['build_auxmvc_vectors']
127 exec_name = 'idq_build_auxmvc_vectors'
128 tag_base = 'build_vectors'
129 auxmvc_analysis_job.__init__(self,cp,sections,exec_name,tag_base=tag_base)
130 self.add_opt('main-channel',main_channel)
131 if channels: self.add_opt('channels', channels)
132 if unsafe_channels: self.add_opt('unsafe-channels', unsafe_channels)
133
134
136 """
137 Dag node for building auxmvc feature vector.
138 """
139 - def __init__(self, job, trigdir, gps_start_time, gps_end_time, output_file, dq_segments=None, dq_segments_name="", p_node=[]):
140 job.set_stdout_file('logs/' + job.tag_base + '-' + str(gps_start_time) + '-' + str(gps_end_time - gps_start_time) +'.out')
141 job.set_stderr_file('logs/' + job.tag_base + '-' + str(gps_start_time) + '-' + str(gps_end_time - gps_start_time) +'.err')
142 pipeline.CondorDAGNode.__init__(self,job)
143 self.add_output_file(output_file)
144 self.add_var_opt('trigger-dir', trigdir)
145 self.add_var_opt('gps-start-time', gps_start_time)
146 self.add_var_opt('gps-end-time', gps_end_time)
147 self.add_var_opt('output-file', output_file)
148 if dq_segments:
149 self.add_var_opt('dq-segments', dq_segments)
150 self.add_var_opt('dq-segments-name', dq_segments_name)
151 for p in p_node:
152 self.add_parent(p)
153
155 """
156 Job for preparing training auxmvc samples.
157 """
159 """
160 """
161 sections = ['prepare_training_auxmvc_samples']
162 exec_name = 'idq_prepare_training_auxmvc_samples'
163 tag_base = 'training_auxmvc'
164 auxmvc_analysis_job.__init__(self,cp,sections,exec_name,tag_base=tag_base)
165
167 """
168 Node for preparing training auxmvc samples job.
169 """
170 - def __init__(self, job, source_dir, gps_start_time, gps_end_time, output_file, dq_segments="", dq_segments_name="",p_node=[]):
171 job.set_stdout_file('logs/' + os.path.split(output_file)[1].replace('.pat', '.out'))
172 job.set_stderr_file('logs/' + os.path.split(output_file)[1].replace('.pat', '.err'))
173 pipeline.CondorDAGNode.__init__(self,job)
174 self.add_output_file(output_file)
175 self.add_var_opt('source-directory', source_dir)
176 self.add_var_opt('gps-start-time', gps_start_time)
177 self.add_var_opt('gps-end-time', gps_end_time)
178 if dq_segments and dq_segments_name:
179 self.add_var_opt('dq-segments', dq_segments)
180 self.add_var_opt('dq-segments-name', dq_segments_name)
181 self.add_var_opt('output-file', output_file)
182 for p in p_node:
183 self.add_parent(p)
184
185
187 """
188 Job for preparing training auxmvc samples.
189 """
191 """
192 """
193 sections = ['add_file_to_cache']
194 exec_name = 'add_file_to_cache'
195 tag_base = 'add_file_to_cache'
196 auxmvc_analysis_job.__init__(self, cp, sections, exec_name, tag_base=tag_base)
197
199 """
200 Node for preparing training auxmvc samples job.
201 """
202 - def __init__(self, job, files, cache, p_node=[]):
203 job.set_stdout_file('logs/' + os.path.split(files[0])[1].split(".")[0] + '_adding_to_cache.out')
204 job.set_stderr_file('logs/' + os.path.split(files[0])[1].split(".")[0] + '_adding_to_cache.err')
205 pipeline.CondorDAGNode.__init__(self,job)
206 self.add_var_arg(cache)
207 for file in files:
208 self.add_var_arg(file)
209 for p in p_node:
210 self.add_parent(p)
211
212
213
215 """
216 Training job for random forest (MVSC).
217 """
219 """
220 """
221 sections = ['train_forest']
222 exec_name = 'SprBaggerDecisionTreeApp'
223 tag_base = 'train_forest'
224 auxmvc_analysis_job.__init__(self,cp,sections,exec_name,tag_base=tag_base, short_opts=True)
225
226
228 """
229 Dag node for training the random forest (MVSC).
230 """
231 - def __init__(self, job, training_data_file, trainedforest_filename, p_node=[]):
232 job.set_stdout_file('logs/' + os.path.split(training_data_file)[1].replace('.pat', '.out'))
233 job.set_stderr_file('logs/' + os.path.split(training_data_file)[1].replace('.pat', '.err'))
234 pipeline.CondorDAGNode.__init__(self,job)
235 self.add_input_file(training_data_file)
236 self.training_data_file = self.get_input_files()[0]
237 self.trainedforest = trainedforest_filename
238 self.add_output_file(self.trainedforest)
239 self.add_var_opt("f", self.trainedforest, short=True)
240 self.add_file_arg(" %s" % (self.training_data_file))
241 for p in p_node:
242 self.add_parent(p)
243
245 """
246 Job using random forest to evaluate unclassified data.
247 """
249 """
250 """
251 sections = ['forest_evaluate']
252 exec_name = 'SprOutputWriterApp'
253 tag_base = 'forest_evaluate'
254 auxmvc_analysis_job.__init__(self,cp, sections, exec_name, tag_base=tag_base, short_opts=True)
255 self.add_short_opt("A", "")
256
258 """
259 Node for radnom forest evaluation job.
260 """
261 - def __init__(self, job, trainedforest, file_to_rank, ranked_file,p_node=[]):
262 job.set_stdout_file('logs/' + os.path.split(ranked_file)[1].replace('.dat', '.out'))
263 job.set_stderr_file('logs/' + os.path.split(ranked_file)[1].replace('.dat', '.err'))
264 pipeline.CondorDAGNode.__init__(self,job)
265 self.add_input_file(trainedforest)
266 self.add_input_file(file_to_rank)
267 self.add_output_file(ranked_file)
268 self.trainedforest = self.get_input_files()[0]
269 self.file_to_rank = self.get_input_files()[1]
270 self.ranked_file = ranked_file
271 self.add_file_arg(" %s %s %s" % (self.trainedforest, self.file_to_rank, self.ranked_file))
272 for p in p_node:
273 self.add_parent(p)
274
275
276
278 """
279 A simple fix job that adds the variables excluded by forest (MVSC) from classification into the output file.
280 Need to be run right after use_forest job.
281 """
283 """
284 """
285 sections = ['forest_add_excluded_vars']
286 exec_name = 'forest_add_excluded_vars'
287 tag_base = 'forest_add_excluded_vars'
288 auxmvc_analysis_job.__init__(self,cp, sections, exec_name, tag_base=tag_base)
289 self.add_opt('excluded-variables', cp.get('forest_evaluate', 'z'))
290
292 """
293 Node for forest_add_excluded_vars_job.
294 """
295 - def __init__(self, job, patfile, datfile, p_node=[]):
296 job.set_stdout_file('logs/' + os.path.split(datfile)[1].replace('.dat', 'faev.out'))
297 job.set_stderr_file('logs/' + os.path.split(datfile)[1].replace('.dat', 'faev.err'))
298 pipeline.CondorDAGNode.__init__(self,job)
299 self.add_input_file(patfile)
300 self.add_input_file(datfile)
301 self.add_var_opt('pat-file',patfile)
302 self.add_var_opt('dat-file',datfile)
303 for p in p_node:
304 self.add_parent(p)
305
306
307
308
310 """
311 Job that makes verious plots and histograms using significance of the axuiloary channels.
312 """
314 sections = ['plot-forest-channels-significance']
315 exec_name = 'auxmvc_plot_mvsc_channels_significance'
316 tag_base = 'plot_channels_signif'
317 auxmvc_analysis_job.__init__(self,cp, sections, exec_name, tag_base=tag_base)
318
319
321 """
322 Node for plot_channels_significance_job.
323 """
324 - def __init__(self, job, input, p_node=[]):
325 pipeline.CondorDAGNode.__init__(self,job)
326 self.add_var_opt("input", input)
327 for p in p_node:
328 self.add_parent(p)
329
330
331
333 """
334 Job that makes plots based on results of evaluation e.g. ROC curves.
335 """
336 - def __init__(self, cp, tag_base='RESULT_PLOTS'):
337 """
338 """
339 sections = ['result_plots']
340 exec_name = 'auxmvc_result_plots'
341 tag_base = 'auxmvc_result_plots'
342 auxmvc_analysis_job.__init__(self,cp, sections, exec_name, tag_base=tag_base)
343
344
346 """
347 Node for result_plots_job.
348 """
349 - def __init__(self, job, datfiles, p_node=[]):
350 pipeline.CondorDAGNode.__init__(self,job)
351 for file in datfiles:
352 self.add_file_arg(file[0])
353 for p in p_node:
354 self.add_parent(p)
355
356
357
358
359
361 """
362 """
364 """
365 """
366 sections = ['svm_evaluate']
367 exec_name = 'svm_evaluate_cmd'
368 tag_base = 'svm_evaluate'
369 auxmvc_analysis_job.__init__(self, cp, sections, exec_name, tag_base=tag_base)
370
371
373 """
374 Node for SVM evaluation job.
375 """
376 - def __init__(self, job, cp, test_file, range_file, svm_model, predict_file, p_node=[]):
377 job.set_stdout_file('logs/' + os.path.split(predict_file)[1].replace('.dat', '.out'))
378 job.set_stderr_file('logs/' + os.path.split(predict_file)[1].replace('.dat', '.err'))
379 pipeline.CondorDAGNode.__init__(self, job)
380 self.add_input_file(test_file)
381 self.add_input_file(range_file)
382 self.add_input_file(svm_model)
383 self.add_output_file(predict_file)
384
385
386
387 self.test_file = self.get_input_files()[0]
388 self.range_file = self.get_input_files()[1]
389 self.svm_model = self.get_input_files()[2]
390 self.predict_file = self.get_output_files()[0]
391 self.add_var_opt('i', self.test_file, short=True)
392 self.add_var_opt('r', self.range_file, short=True)
393 self.add_var_opt('m', self.svm_model, short=True)
394 self.add_var_opt('o', self.predict_file, short=True)
395
396 for p in p_node:
397 self.add_parent(p)
398
399
401 """
402 Training job for svm.
403 """
405 """
406 """
407 sections = ['svm_train']
408 exec_name = 'svm_train_cmd'
409 tag_base = 'svm_train'
410 auxmvc_analysis_job.__init__(self, cp, sections, exec_name, tag_base=tag_base)
411
412
414 """
415 Node for SVM train job.
416 """
417 - def __init__(self, job, cp, train_file, range_file, model_file, p_node=[]):
418 job.set_stdout_file('logs/' + os.path.split(train_file)[1].replace('.pat', '.out'))
419 job.set_stderr_file('logs/' + os.path.split(train_file)[1].replace('.pat', '.err'))
420 pipeline.CondorDAGNode.__init__(self, job)
421 self.add_input_file(train_file)
422 self.add_output_file(range_file)
423 self.add_output_file(model_file)
424
425
426
427
428 self.train_file = self.get_input_files()[0]
429 self.range_file = self.get_output_files()[0]
430 self.model_file = self.get_output_files()[1]
431
432 self.train_file_svm = os.path.abspath(self.train_file) + '.mid'
433 self.scale_file = os.path.abspath(self.train_file) + '.scale'
434 self.add_var_opt("train-file", self.train_file)
435 self.add_var_opt("train-file-svm", self.train_file_svm)
436 self.add_var_opt("scale-file", self.scale_file)
437 self.add_var_opt("range-file", self.range_file)
438 self.add_var_opt("model-file", self.model_file)
439
440 self.set_post_script("/bin/rm ")
441 self.add_post_script_arg(self.train_file_svm)
442 self.add_post_script_arg(self.scale_file)
443
444 for p in p_node:
445 self.add_parent(p)
446
447
448
449
451 """
452 Job for converting pat files to FANN type files.
453 """
455 """
456 """
457 sections = ['ann_convert']
458 exec_name = 'ConvertSprToFann'
459 tag_base = 'ann_convert'
460 auxmvc_analysis_job.__init__(self,cp,sections,exec_name,tag_base=tag_base, short_opts=False)
461
462
464 """
465 Dag node for converting pat files to FANN type files.
466 """
467 - def __init__(self, job, pat_file, p_node=[]):
468 job.set_stdout_file('logs/' + os.path.split(training_data_file)[1].replace('.pat', '.out'))
469 job.set_stderr_file('logs/' + os.path.split(training_data_file)[1].replace('.pat', '.err'))
470 pipeline.CondorDAGNode.__init__(self,job)
471 self.add_input_file(pat_file)
472 self.pat_file = self.get_input_files()[0]
473 self.fann_file = pat_file.replace(".pat",".ann")
474 self.add_output_file(self.fann_file)
475 self.add_file_arg(" %s" % (self.pat_file))
476 for p in p_node:
477 self.add_parent(p)
478
480 """
481 Training job for Artifical Neural Networks (ANN) with iRPROP- algorithm.
482 """
484 """
485 """
486 sections = ['train_ann']
487 exec_name = 'TrainNeuralNet'
488 tag_base = 'train_ann'
489 auxmvc_analysis_job.__init__(self,cp,sections,exec_name,tag_base=tag_base, short_opts=False)
490
491
493 """
494 Dag node for training the Artificial Neural Networks (ANN) with iRPROP- algorithm.
495 """
496 - def __init__(self, job, training_data_file, trained_ann_filename, p_node=[]):
497 job.set_stdout_file('logs/' + os.path.split(training_data_file)[1].replace('.pat', '.out'))
498 job.set_stderr_file('logs/' + os.path.split(training_data_file)[1].replace('.pat', '.err'))
499 pipeline.CondorDAGNode.__init__(self,job)
500 self.add_input_file(training_data_file)
501 self.training_data_file = self.get_input_files()[0]
502 self.trained_ann = trained_ann_filename
503 self.add_output_file(self.trained_ann)
504 self.add_file_arg(" -t %s -s %s" % (self.training_data_file, self.trained_ann))
505 for p in p_node:
506 self.add_parent(p)
507
509 """
510 Job using ANN to evaluate unclassified data.
511 """
513 """
514 """
515 sections = ['ann_evaluate']
516 exec_name = 'EvaluateNeuralNet'
517 tag_base = 'ann_evaluate'
518 auxmvc_analysis_job.__init__(self,cp, sections, exec_name, tag_base=tag_base, short_opts=False)
519 self.add_short_opt("", "")
520
522 """
523 Node for ANN evaluation job.
524 """
525 - def __init__(self, job, trained_ann, file_to_rank, ranked_file,p_node=[]):
526 job.set_stdout_file('logs/' + os.path.split(ranked_file)[1].replace('.dat', '.out'))
527 job.set_stderr_file('logs/' + os.path.split(ranked_file)[1].replace('.dat', '.err'))
528 pipeline.CondorDAGNode.__init__(self,job)
529 self.add_input_file(trained_ann)
530 self.add_input_file(file_to_rank)
531 self.add_output_file(ranked_file)
532 self.trained_ann = self.get_input_files()[0]
533 self.file_to_rank = self.get_input_files()[1]
534 self.ranked_file = ranked_file
535 self.add_file_arg(" -n %s -e %s -s %s" % (self.trained_ann, self.file_to_rank, self.ranked_file))
536 for p in p_node:
537 self.add_parent(p)
538