1
2
3
4
5
6
7 from Ganga.GPIDev.Adapters.IMerger import MergerError, IMerger
8 from Ganga.GPIDev.Base import GangaObject
9 from Ganga.GPIDev.Base.Proxy import GPIProxyObject
10 from Ganga.GPIDev.Schema import ComponentItem, FileItem, Schema, SimpleItem, Version
11
12 from Ganga.Utility.Config import makeConfig, ConfigError, getConfig
13 from Ganga.Utility.Plugin import allPlugins
14 from Ganga.Utility.logging import getLogger, log_user_exception
15 import commands
16 import os
17 import string
18
19 logger = getLogger()
20
21
22 config = makeConfig('Mergers','parameters for mergers')
23 config.addOption('associate',"{'log':'TextMerger','root':'RootMerger',"
24 "'text':'TextMerger','txt':'TextMerger'}",'Dictionary of file associations')
25 config.addOption('merge_output_dir','~/gangadir/merge_results',"location of the merger's outputdir")
26 config.addOption('std_merge','TextMerger','Standard (default) merger')
27
38
63
65 """Method to run the merge command."""
66
67 result = False
68
69
70 if job._getParent() != None:
71 return result
72
73 allowed_states = ['completed','failed','killed']
74 if not new_status in allowed_states:
75 return result
76
77 try:
78 if job.merger:
79
80 if new_status == allowed_states[0] or job.merger.ignorefailed:
81
82
83 sum_outputdir = None
84 if job.merger.set_outputdir_for_automerge:
85 sum_outputdir = job.outputdir
86
87 result = job.merger.merge(job.subjobs, sum_outputdir)
88
89 except Exception:
90 log_user_exception()
91 raise
92
93 return result
94
95
96
112
114 """
115 The idea behind this class is to put all of the checking and user interaction in this class, and then use a very simple
116 stateless merge_tool to actually do the relevant merge. The Abstract label is perhaps misleading, but I intend all Merger
117 objects to inherit from this.
118 """
119 _schema = Schema(Version(1,0), {
120 'files' : SimpleItem(defvalue = [], typelist=['str'], sequence = 1, doc='A list of files to merge.'),
121 'merge_tool' : ComponentItem('merge_tools', defvalue = None, doc='The merge tool to use.', hidden = 1),
122 'ignorefailed' : SimpleItem(defvalue = False, doc='Jobs that are in the failed or killed states will be excluded from the merge when this flag is set to True.'),
123 'overwrite' : SimpleItem(defvalue = False, doc='The default behaviour for this Merger object. Will overwrite output files.')
124 } )
125 _category = 'mergers'
126 _name = 'AbstractMerger'
127 _hidden = 1
128
129 _GUIPrefs = [{'attribute' : 'ignorefailed', 'widget' : 'Bool'},
130 {'attribute' : 'overwrite', 'widget' : 'Bool'}]
131
132 success = True
133 failure = False
134
138
139 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
140 """
141 Method to merge the output of jobs.
142
143 jobs may be a single job instance or a sequence of Jobs
144 outputdir is the name of the directry to put the merge results in. It will be created if needed.
145 ignorefailed and overwrite have the same meaning as in the schema, but override the schema values.
146
147 returns whether the merge was successful or not as a boolean
148 """
149
150 if ignorefailed == None:
151 ignorefailed = self.ignorefailed
152
153 if overwrite == None:
154 overwrite = self.overwrite
155
156
157 from Ganga.GPIDev.Lib.Job import Job
158 if isinstance(jobs,GPIProxyObject) and isinstance(jobs._impl,Job):
159 if outputdir is None:
160 outputdir = jobs.outputdir
161 return self.merge(jobs.subjobs,outputdir = outputdir, ignorefailed = ignorefailed, overwrite = overwrite)
162
163 if len(jobs) == 0:
164 logger.warning('The jobslice given was empty. The merge will not continue.')
165 return self.success
166
167 if not outputdir:
168 outputdir = getDefaultMergeDir()
169 else:
170 if isinstance(outputdir,GPIProxyObject) and isinstance(outputdir._impl,Job):
171
172 outputdir = outputdir.outputdir
173 else:
174 outputdir = os.path.expanduser(outputdir)
175
176 files = {}
177 for f in self.files:
178 files[f] = []
179 for j in jobs:
180
181 if j.status != 'completed':
182
183 if j.status == 'failed' or j.status == 'killed':
184 if ignorefailed:
185 logger.warning('Job %s has status %s and is being ignored.', j.fqid, j.status)
186 continue
187 else:
188 logger.error('Job %s has status %s and so the merge can not continue. '\
189 'This can be overridden with the ignorefailed flag.', j.fqid, j.status)
190 return self.failure
191 else:
192 logger.error("Job %s is in an unsupported status %s and so the merge can not continue. '\
193 'Supported statuses are 'completed', 'failed' or 'killed' (if the ignorefailed flag is set).", j.fqid, j.status)
194 return self.failure
195
196
197 if not j.merger and len(j.subjobs):
198 sub_result = self.merge(j.subjobs,outputdir = j.outputdir, ignorefailed = ignorefailed, overwrite = overwrite)
199 if (sub_result == self.failure) and not ignorefailed:
200 logger.error('The merge of Job %s failed and so the merge can not continue. '\
201 'This can be overridden with the ignorefailed flag.', j.fqid)
202 return self.failure
203
204
205 for f in files.keys():
206 p = os.path.join(j.outputdir,f)
207 if not os.path.exists(p):
208 if ignorefailed:
209 logger.warning('The file %s in Job %s was not found. The file will be ignored.',str(f),j.fqid)
210 continue
211 else:
212 logger.error('The file %s in Job %s was not found and so the merge can not continue. '\
213 'This can be overridden with the ignorefailed flag.', str(f), j.fqid)
214 return self.failure
215 files[f].append(p)
216
217 for k in files.keys():
218
219 outputfile = os.path.join(outputdir,k)
220
221 if os.path.exists(outputfile) and not overwrite:
222 logger.error('The merge process can not continue as it will result in over writing. '\
223 'Either move the file %s or set the overwrite flag to True.', str(outputfile))
224 return self.failure
225
226
227 if not os.path.exists(outputdir):
228 os.mkdir(outputdir)
229
230
231 outputfile_dirname = os.path.dirname(outputfile)
232 if outputfile_dirname != outputdir:
233 if not os.path.exists(outputfile_dirname):
234 os.mkdir(outputfile_dirname)
235
236
237 if not files[k]:
238 logger.warning('Attempting to merge with no files. Request will be ignored.')
239 continue
240
241
242 for f in files[k]:
243 if f == outputfile:
244 logger.error('Output file %s equals input file %s. The merge will fail.',
245 outputfile, f)
246 return self.failure
247
248
249 msg = None
250 try:
251 self.merge_tool.mergefiles(files[k],outputfile)
252
253
254
255 log_file = '%s.merge_summary' % outputfile
256 log = file(log_file,'w')
257 try:
258 log.write('# -- List of files merged -- #\n')
259 for f in files[k]:
260 log.write('%s\n' % f)
261 log.write('# -- End of list -- #\n')
262 finally:
263 log.close()
264
265 except MergerError, e:
266 msg = str(e)
267
268
269 log_file = '%s.merge_summary' % outputfile
270 log = file(log_file,'w')
271 try:
272 log.write('# -- Error in Merge -- #\n')
273 log.write('\t%s\n' % msg)
274 finally:
275 log.close()
276 raise e
277
278 return self.success
279
280
282 """Very simple merge tool to cat together text files. Adds a few simple headers."""
283 _category = 'merge_tools'
284 _hidden = 1
285 _name = '_TextMergeTool'
286 _schema = IMergeTool._schema.inherit_copy()
287 _schema.datadict['compress'] = SimpleItem(defvalue = False, doc='Output should be compressed with gzip.')
288
290
291 import time
292
293 if self.compress or output_file.lower().endswith('.gz'):
294
295 import gzip
296 if not output_file.lower().endswith('.gz'):
297 output_file += '.gz'
298 out_file = gzip.GzipFile(output_file,'w')
299 else:
300 out_file = file(output_file,'w')
301
302 out_file.write('# Ganga TextMergeTool - %s #\n' % time.asctime())
303 for f in file_list:
304
305 if not f.lower().endswith('.gz'):
306 in_file = file(f)
307 else:
308 import gzip
309 in_file = gzip.GzipFile(f)
310
311 out_file.write('# Start of file %s #\n' % str(f))
312 out_file.write(in_file.read())
313 out_file.write('\n')
314
315 in_file.close()
316
317 out_file.write('# Ganga Merge Ended Successfully #\n')
318 out_file.flush()
319 out_file.close()
320
362
363
364
365 -class TextMerger(AbstractMerger):
366 """Merger class for text
367
368 TextMerger will append specified text files in the order that they are
369 encountered in the list of Jobs. Each file will be separated by a header
370 giving some very basic information about the individual files.
371
372 Usage:
373
374 tm = TextMerger()
375 tm.files = ['job.log','results.txt']
376 tm.overwrite = True #False by default
377 tm.ignorefailed = True #False by default
378
379 # will produce the specified files
380 j = Job()
381 j.outputsandbox = ['job.log','results.txt']
382 j.splitter = SomeSplitter()
383 j.merger = tm
384 j.submit()
385
386 The merge object will be used to merge the output of
387 each subjob into j.outputdir. This will be run when
388 the job completes. If the ignorefailed flag has been set
389 then the merge will also be run as the job enters the
390 killed or failed states.
391
392 The above merger object can also be used independently
393 to merge a list of jobs or the subjobs of an single job.
394
395 #tm defined above
396 tm.merge(j, outputdir = '~/merge_dir')
397 tm.merge([.. list of jobs ...], '~/merge_dir', ignorefailed = True, overwrite = False)
398
399 If ignorefailed or overwrite are set then they override the values set on the
400 merge object.
401
402 If outputdir is not specified, the default location specfied
403 in the [Mergers] section of the .gangarc file will be used.
404
405 For large text files it may be desirable to compress the merge
406 result using gzip. This can be done by setting the compress
407 flag on the TextMerger object. In this case, the merged file
408 will have a '.gz' appended to its filename.
409
410 A summary of all the files merged will be created for each entry in files.
411 This will be created when the merge of those files completes
412 successfully. The name of this is the same as the output file, with the
413 '.merge_summary' extension appended and will be placed in the same directory
414 as the merge results.
415
416 """
417 _category = 'mergers'
418 _exportmethods = ['merge']
419 _name = 'TextMerger'
420 _schema = AbstractMerger._schema.inherit_copy()
421 _schema.datadict['compress'] = SimpleItem(defvalue = False, doc='Output should be compressed with gzip.')
422
423
424 - def __init__(self):
426
427 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
431
433 """Merger class for ROOT files
434
435 RootMerger will use the version of ROOT configured in the .gangarc file to
436 add together histograms and trees using the 'hadd' command provided by ROOT.
437 Further details of the hadd command can be found in the ROOT documentation.
438
439 Usage:
440
441 rm = RootMerger()
442 rm.files = ['hist.root','trees.root']
443 rm.overwrite = True #False by default
444 rm.ignorefailed = True #False by default
445 rm.args = '-f2' #pass arguments to hadd
446
447 # will produce the specified files
448 j = Job()
449 j.outputsandbox = ['hist.root','trees.root']
450 j.splitter = SomeSplitter()
451 j.merger = rm
452 j.submit()
453
454 The merge object will be used to merge the output of
455 each subjob into j.outputdir. This will be run when
456 the job completes. If the ignorefailed flag has been set
457 then the merge will also be run as the job enters the
458 killed or failed states.
459
460 The above merger object can also be used independently
461 to merge a list of jobs or the subjobs of an single job.
462
463 #rm defined above
464 rm.merge(j, outputdir = '~/merge_dir')
465 rm.merge([.. list of jobs ...], '~/merge_dir', ignorefailed = True, overwrite = False)
466
467 If ignorefailed or overwrite are set then they override the
468 values set on the merge object.
469
470 A summary of all the files merged will be created for each entry in files.
471 This will be created when the merge of those files completes
472 successfully. The name of this is the same as the output file, with the
473 '.merge_summary' extension appended and will be placed in the same directory
474 as the merge results.
475
476 If outputdir is not specified, the default location specfied
477 in the [Mergers] section of the .gangarc file will be used.
478
479 """
480
481 _category = 'mergers'
482 _exportmethods = ['merge']
483 _name = 'RootMerger'
484 _schema = AbstractMerger._schema.inherit_copy()
485 _schema.datadict['args'] = SimpleItem(defvalue = None, doc='Arguments to be passed to hadd.',\
486 typelist=['str','type(None)'])
487
490
491 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
495
496
498 """Merger class when merges of different file types are needed.
499
500 Here is a typical usage example:
501
502 # job produces both Root and Text files
503 j = Job()
504
505 tm = TextMerger()
506 tm.files = ['job.log','stdout']
507 tm.overwrite = True
508
509 rm = RootMerger()
510 rm.files = ['histo.root','tree.root']
511 rm.ignorefailed = True
512
513 mm = MultipleMerger()
514 mm.addMerger(tm)
515 mm.addMerger(rm)
516
517 j.merger = mm
518 # All files will be merged on completion
519 j.submit()
520
521 MultipleMerger objects can also be used on
522 individual Jobs or lists of Jobs.
523
524 #mm defined above
525 mm.merge([..list of Jobs ...], outputdir = '~/merge_results', ignorefailed = False, overwrite = True)
526
527 The ignorefailed and overwrite flags are
528 propagated to the individual Merger objects.
529
530 If outputdir is not specified, the default location
531 specfied in the [Mergers] section of the .gangarc
532 file will be used.
533
534 It is permissible to nest MultipleMerger objects
535 inside one another if extra hierarchy is desired.
536
537 """
538
539 _category = 'mergers'
540 _exportmethods = ['addMerger','merge']
541 _name = 'MultipleMerger'
542 _schema = Schema(Version(1,0), {
543 'merger_objects' : ComponentItem('mergers', defvalue = [], doc = 'A list of Merge objects to run', sequence = 1)
544 })
545
546 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
557
559 """Adds a merger object to the list of merges to be done."""
560 self.merger_objects.append(merger_object)
561
563 """Look at a list of jobs and find a set of files present in each job that can be merged together"""
564
565 result = []
566
567 file_map = {}
568 jobs_len = len(jobs)
569 for j in jobs:
570 for file_name in j.outputsandbox:
571 file_map[file_name] = file_map.setdefault(file_name,0) + 1
572
573 for file_name, count in file_map.iteritems():
574 if count == jobs_len: result.append(file_name)
575 else:
576 logger.warning('The file %s was not found in all jobs to be merged and so will be ignored.', file_name)
577 logger.info('No files specified, so using %s.', str(result))
578
579 return result
580
581
583 """Allows the different types of merge to be run according to file extension in an automatic way.
584
585 SmartMerger accepts a list of files which it will delegate to individual Merger objects based on
586 the file extension of the file. The mapping between file extensions and Merger objects can
587 be defined in the [Mergers] section of the .gangarc file. Extensions are treated in a case
588 insensitive way. If a file extension is not recognized than the file will be ignored if the
589 ignorefailed flag is set, or the merge will fail.
590
591 Example:
592
593 sm = SmartMerger()
594 sm.files = ['stderr','histo.root','job.log','summary.txt','trees.root','stdout']
595 sm.merge([... list of jobs ...], outputdir = '~/merge_dir')#also accepts a single Job
596
597 If outputdir is not specified, the default location specfied in the [Mergers]
598 section of the .gangarc file will be used.
599
600 If files is not specified, then it will be taken from the list of jobs given to
601 the merge method. Only files which appear in all jobs will be merged.
602
603 SmartMergers can also be attached to Job objects in the same way as other Merger
604 objects.
605
606 #sm defined above
607 j = Job()
608 j.splitter = SomeSplitter()
609 j.merger = sm
610 j.submit()
611
612 """
613
614 _category = 'mergers'
615 _exportmethods = ['merge']
616 _name = 'SmartMerger'
617 _schema = Schema(Version(1,0), {
618 'files' : SimpleItem(defvalue = [], typelist=['str'], sequence = 1, doc='A list of files to merge.'),
619 'ignorefailed' : SimpleItem(defvalue = False, doc='Jobs that are in the failed or killed states will be excluded from the merge when this flag is set to True.'),
620 'overwrite' : SimpleItem(defvalue = False, doc='The default behaviour for this Merger object. Will overwrite output files.')
621 } )
622
623 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
624
625
626 if ignorefailed == None:
627 ignorefailed = self.ignorefailed
628
629 if overwrite == None:
630 overwrite = self.overwrite
631
632
633 from Ganga.GPIDev.Lib.Job import Job
634 if isinstance(jobs,Job):
635 if outputdir is None:
636 outputdir = jobs.outputdir
637 return self.merge(jobs.subjobs,outputdir = outputdir, ignorefailed = ignorefailed, overwrite = overwrite)
638
639 if len(jobs) == 0:
640 logger.warning('The jobslice given was empty. The merge will not continue.')
641 return AbstractMerger.success
642
643
644 if not self.files:
645 self.files = findFilesToMerge(jobs)
646
647 type_map = {}
648 for f in self.files:
649
650 if not getMergerObject(f):
651
652
653 file_ext = os.path.splitext(f)[1].lstrip('.')
654
655
656 if not file_ext:
657 if f in ['stdout','stderr']:
658 file_ext = 'std_merge'
659 elif ignorefailed:
660 logger.warning('File extension not found for file %s and so the file will be ignored. '\
661 'Check the name of the file.',f)
662 continue
663 else:
664 logger.warning('File extension not found for file %s and so the merge will fail. '\
665 'Check the name of the file or set the ignorefailed flag.',f)
666 return AbstractMerger.failure
667
668 file_ext = file_ext.lower()
669
670 else:
671
672 file_ext = f
673
674
675 type_map.setdefault(file_ext, []).append(f)
676
677
678 multi_merge = MultipleMerger()
679 for ext in type_map.keys():
680
681 merge_object = getMergerObject(ext)
682 if merge_object == None:
683 logger.error('Extension %s not recognized and so the merge will fail. '\
684 'Check the [Mergers] section of your .gangarc file.', ext)
685 return AbstractMerger.failure
686
687
688 merge_object.files = type_map[ext]
689 merge_object.ignorefailed = ignorefailed
690 merge_object.overwrite = overwrite
691
692 multi_merge.addMerger(merge_object)
693
694 return multi_merge.merge(jobs, outputdir = outputdir, ignorefailed = ignorefailed, overwrite = overwrite)
695
747
748
749
751 """User tool for writing custom merging tools with Python
752
753 Allows a script to be supplied that performs the merge of some custom file type.
754 The script must be a python file which defines the following function:
755
756 def mergefiles(file_list, output_file):
757
758 #perform the merge
759 if not success:
760 return -1
761 else:
762 return 0
763
764 This module will be imported and used by the CustomMerger. The file_list is a
765 list of paths to the files to be merged. output_file is a string path for
766 the output of the merge. This file must exist by the end of the merge or the
767 merge will fail. If the merge cannot proceed, then the function should return a
768 non-zero integer.
769
770 Clearly this tool is provided for advanced ganga usage only, and should be used with
771 this in mind.
772
773 """
774 _category = 'mergers'
775 _exportmethods = ['merge']
776 _name = 'CustomMerger'
777 _schema = AbstractMerger._schema.inherit_copy()
778 _schema.datadict['module'] = FileItem(defvalue = None, doc='Path to a python module to perform the merge.')
779
780
783
784 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
791
792
793
794
795 allPlugins.add(_CustomMergeTool,'merge_tools','_CustomMergeTool')
796 allPlugins.add(_TextMergeTool,'merge_tools','_TextMergeTool')
797 allPlugins.add(_RootMergeTool,'merge_tools','_RootMergeTool')
798
799 allPlugins.setDefault('merge_tools','_TextMergeTool')
800