Package Ganga :: Package Lib :: Package Mergers :: Module Merger
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.Mergers.Merger

  1  ################################################################################ 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: Merger.py,v 1.5 2009-03-18 10:46:01 wreece Exp $ 
  5  ################################################################################ 
  6   
  7  from Ganga.GPIDev.Adapters.IMerger import MergerError, IMerger 
  8  from Ganga.GPIDev.Base import GangaObject 
  9  from Ganga.GPIDev.Base.Proxy import GPIProxyObject 
 10  from Ganga.GPIDev.Schema import ComponentItem, FileItem, Schema, SimpleItem, Version 
 11   
 12  from Ganga.Utility.Config import makeConfig, ConfigError, getConfig 
 13  from Ganga.Utility.Plugin import allPlugins 
 14  from Ganga.Utility.logging import getLogger, log_user_exception 
 15  import commands 
 16  import os 
 17  import string 
 18   
 19  logger = getLogger() 
 20   
 21  #set the mergers config up 
 22  config = makeConfig('Mergers','parameters for mergers') 
 23  config.addOption('associate',"{'log':'TextMerger','root':'RootMerger'," 
 24                   "'text':'TextMerger','txt':'TextMerger'}",'Dictionary of file associations') 
 25  config.addOption('merge_output_dir','~/gangadir/merge_results',"location of the merger's outputdir") 
 26  config.addOption('std_merge','TextMerger','Standard (default) merger') 
 27   
28 -def getDefaultMergeDir():
29 """Gets the default location of the mergers outputdir from the config""" 30 31 outputdir = "~/gangadir/merge_results" 32 try: 33 config = getConfig('Mergers') 34 outputdir = config['merge_output_dir'] 35 except ConfigError: 36 pass 37 return os.path.expanduser(outputdir)
38
39 -def getMergerObject(file_ext):
40 """Returns an instance of the correct merger tool, or None if there is not one""" 41 from Ganga.Utility.Plugin.GangaPlugin import PluginManagerError 42 result = None 43 try: 44 config = getConfig('Mergers') 45 46 if file_ext == 'std_merge': 47 result = allPlugins.find('mergers',config[file_ext])() 48 else: 49 #load the dictionary of file assocaitions 50 file_types = eval(config['associate']) 51 result = allPlugins.find('mergers',file_types[file_ext])() 52 except ConfigError: 53 pass 54 except KeyError: 55 pass 56 except PluginManagerError: 57 pass 58 except SyntaxError: 59 pass 60 except TypeError:#TypeError as we may not be able to call () 61 pass #just return None 62 return result
63
64 -def runAutoMerge(job, new_status):
65 """Method to run the merge command.""" 66 67 result = False 68 69 #we only run on master jobs (which have no parent) 70 if job._getParent() != None: 71 return result 72 73 allowed_states = ['completed','failed','killed'] 74 if not new_status in allowed_states: 75 return result 76 77 try: 78 if job.merger: 79 #we run if master is in a failed state if ignorefailed flag is set 80 if new_status == allowed_states[0] or job.merger.ignorefailed: 81 82 # leave the output directory to the implementation (fix for http://savannah.cern.ch/bugs/?76445) 83 sum_outputdir = None 84 if job.merger.set_outputdir_for_automerge: 85 sum_outputdir = job.outputdir 86 87 result = job.merger.merge(job.subjobs, sum_outputdir) 88 89 except Exception: 90 log_user_exception() 91 raise 92 93 return result
94 95 96
97 -class IMergeTool(GangaObject):
98 """This is an interface class for the a stateless merge tool. Concrete merge tools should inherit from it""" 99 _category = 'merge_tools' 100 _hidden = 1 101 _name = 'IMergeTool' 102 _schema = Schema(Version(1,0), {}) 103
104 - def mergefiles(self, file_list, output_file):
105 """ 106 file_list: A list of fully qualified file names that should be merged together. 107 output_file: The name of the file to write the merge results to. 108 109 If the merge fails for any reason, then a MergerError should be thrown. 110 """ 111 raise NotImplementedError
112
113 -class AbstractMerger(IMerger):
114 """ 115 The idea behind this class is to put all of the checking and user interaction in this class, and then use a very simple 116 stateless merge_tool to actually do the relevant merge. The Abstract label is perhaps misleading, but I intend all Merger 117 objects to inherit from this. 118 """ 119 _schema = Schema(Version(1,0), { 120 'files' : SimpleItem(defvalue = [], typelist=['str'], sequence = 1, doc='A list of files to merge.'), 121 'merge_tool' : ComponentItem('merge_tools', defvalue = None, doc='The merge tool to use.', hidden = 1), 122 'ignorefailed' : SimpleItem(defvalue = False, doc='Jobs that are in the failed or killed states will be excluded from the merge when this flag is set to True.'), 123 'overwrite' : SimpleItem(defvalue = False, doc='The default behaviour for this Merger object. Will overwrite output files.') 124 } ) 125 _category = 'mergers' 126 _name = 'AbstractMerger' 127 _hidden = 1 128 129 _GUIPrefs = [{'attribute' : 'ignorefailed', 'widget' : 'Bool'}, 130 {'attribute' : 'overwrite', 'widget' : 'Bool'}] 131 132 success = True 133 failure = False 134
135 - def __init__(self, merge_tool):
136 super(AbstractMerger,self).__init__() 137 self.merge_tool = merge_tool
138
139 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
140 """ 141 Method to merge the output of jobs. 142 143 jobs may be a single job instance or a sequence of Jobs 144 outputdir is the name of the directry to put the merge results in. It will be created if needed. 145 ignorefailed and overwrite have the same meaning as in the schema, but override the schema values. 146 147 returns whether the merge was successful or not as a boolean 148 """ 149 150 if ignorefailed == None: 151 ignorefailed = self.ignorefailed 152 153 if overwrite == None: 154 overwrite = self.overwrite 155 156 # special case the passing of a Job object. 157 from Ganga.GPIDev.Lib.Job import Job 158 if isinstance(jobs,GPIProxyObject) and isinstance(jobs._impl,Job): 159 if outputdir is None: 160 outputdir = jobs.outputdir 161 return self.merge(jobs.subjobs,outputdir = outputdir, ignorefailed = ignorefailed, overwrite = overwrite) 162 163 if len(jobs) == 0: 164 logger.warning('The jobslice given was empty. The merge will not continue.') 165 return self.success 166 167 if not outputdir: 168 outputdir = getDefaultMergeDir() 169 else: 170 if isinstance(outputdir,GPIProxyObject) and isinstance(outputdir._impl,Job): 171 #use info from job 172 outputdir = outputdir.outputdir 173 else: 174 outputdir = os.path.expanduser(outputdir) 175 176 files = {} 177 for f in self.files: 178 files[f] = [] 179 for j in jobs: 180 #first check that the job is ok 181 if j.status != 'completed': 182 #check if we can keep going 183 if j.status == 'failed' or j.status == 'killed': 184 if ignorefailed: 185 logger.warning('Job %s has status %s and is being ignored.', j.fqid, j.status) 186 continue 187 else: 188 logger.error('Job %s has status %s and so the merge can not continue. '\ 189 'This can be overridden with the ignorefailed flag.', j.fqid, j.status) 190 return self.failure 191 else: 192 logger.error("Job %s is in an unsupported status %s and so the merge can not continue. '\ 193 'Supported statuses are 'completed', 'failed' or 'killed' (if the ignorefailed flag is set).", j.fqid, j.status) 194 return self.failure 195 196 #run the merge recursively 197 if not j.merger and len(j.subjobs): 198 sub_result = self.merge(j.subjobs,outputdir = j.outputdir, ignorefailed = ignorefailed, overwrite = overwrite) 199 if (sub_result == self.failure) and not ignorefailed: 200 logger.error('The merge of Job %s failed and so the merge can not continue. '\ 201 'This can be overridden with the ignorefailed flag.', j.fqid) 202 return self.failure 203 204 205 for f in files.keys(): 206 p = os.path.join(j.outputdir,f) 207 if not os.path.exists(p): 208 if ignorefailed: 209 logger.warning('The file %s in Job %s was not found. The file will be ignored.',str(f),j.fqid) 210 continue 211 else: 212 logger.error('The file %s in Job %s was not found and so the merge can not continue. '\ 213 'This can be overridden with the ignorefailed flag.', str(f), j.fqid) 214 return self.failure 215 files[f].append(p) 216 217 for k in files.keys(): 218 # make sure we are not going to over write anything 219 outputfile = os.path.join(outputdir,k) 220 221 if os.path.exists(outputfile) and not overwrite: 222 logger.error('The merge process can not continue as it will result in over writing. '\ 223 'Either move the file %s or set the overwrite flag to True.', str(outputfile)) 224 return self.failure 225 226 #make the directory if it does not exist 227 if not os.path.exists(outputdir): 228 os.mkdir(outputdir) 229 230 #recreate structure from output sandbox 231 outputfile_dirname = os.path.dirname(outputfile) 232 if outputfile_dirname != outputdir: 233 if not os.path.exists(outputfile_dirname): 234 os.mkdir(outputfile_dirname) 235 236 #check that we are merging some files 237 if not files[k]: 238 logger.warning('Attempting to merge with no files. Request will be ignored.') 239 continue 240 241 #check outputfile != inputfile 242 for f in files[k]: 243 if f == outputfile: 244 logger.error('Output file %s equals input file %s. The merge will fail.', 245 outputfile, f) 246 return self.failure 247 248 #merge the lists of files with a merge tool into outputfile 249 msg = None 250 try: 251 self.merge_tool.mergefiles(files[k],outputfile) 252 253 #create a log file of the merge 254 #we only get to here if the merge_tool ran ok 255 log_file = '%s.merge_summary' % outputfile 256 log = file(log_file,'w') 257 try: 258 log.write('# -- List of files merged -- #\n') 259 for f in files[k]: 260 log.write('%s\n' % f) 261 log.write('# -- End of list -- #\n') 262 finally: 263 log.close() 264 265 except MergerError, e: 266 msg = str(e) 267 268 #store the error msg 269 log_file = '%s.merge_summary' % outputfile 270 log = file(log_file,'w') 271 try: 272 log.write('# -- Error in Merge -- #\n') 273 log.write('\t%s\n' % msg) 274 finally: 275 log.close() 276 raise e 277 278 return self.success
279 280
281 -class _TextMergeTool(IMergeTool):
282 """Very simple merge tool to cat together text files. Adds a few simple headers.""" 283 _category = 'merge_tools' 284 _hidden = 1 285 _name = '_TextMergeTool' 286 _schema = IMergeTool._schema.inherit_copy() 287 _schema.datadict['compress'] = SimpleItem(defvalue = False, doc='Output should be compressed with gzip.') 288
289 - def mergefiles(self, file_list, output_file):
290 291 import time 292 293 if self.compress or output_file.lower().endswith('.gz'): 294 #use gzip 295 import gzip 296 if not output_file.lower().endswith('.gz'): 297 output_file += '.gz' 298 out_file = gzip.GzipFile(output_file,'w') 299 else: 300 out_file = file(output_file,'w') 301 302 out_file.write('# Ganga TextMergeTool - %s #\n' % time.asctime()) 303 for f in file_list: 304 305 if not f.lower().endswith('.gz'): 306 in_file = file(f) 307 else: 308 import gzip 309 in_file = gzip.GzipFile(f) 310 311 out_file.write('# Start of file %s #\n' % str(f)) 312 out_file.write(in_file.read()) 313 out_file.write('\n') 314 315 in_file.close() 316 317 out_file.write('# Ganga Merge Ended Successfully #\n') 318 out_file.flush() 319 out_file.close()
320
321 -class _RootMergeTool(IMergeTool):
322 """Wrapper around hadd that merges root files.""" 323 324 _category = 'merge_tools' 325 _hidden = 1 326 _name = '_RootMergeTool' 327 _schema = IMergeTool._schema.inherit_copy() 328 _schema.datadict['args'] = SimpleItem(defvalue = None, doc='Arguments to be passed to hadd.',\ 329 typelist=['str','type(None)']) 330
331 - def mergefiles(self, file_list, output_file):
332 333 from Ganga.Utility.root import getrootprefix, checkrootprefix 334 rc, rootprefix = getrootprefix() 335 336 if rc != 0: 337 raise MergerError('ROOT has not been properly configured. Check your .gangarc file.') 338 339 if checkrootprefix(): 340 raise MergerError('Can not run ROOT correctly. Check your .gangarc file.') 341 342 #we always force as the overwrite is handled by our parent 343 default_arguments = '-f' 344 merge_cmd = rootprefix + 'hadd ' 345 if self.args: #pass any args on 346 merge_cmd += ' %s ' % self.args 347 348 #don't add a -f unless needed 349 if not default_arguments in merge_cmd: 350 merge_cmd += ' %s ' % default_arguments 351 352 353 #add the list of files, output file first 354 arg_list = [output_file] 355 arg_list.extend(file_list) 356 merge_cmd += string.join(arg_list,' ') 357 358 rc, out = commands.getstatusoutput(merge_cmd) 359 if rc: 360 logger.error(out) 361 raise MergerError('The ROOT merge failed to complete. The command used was %s.' % merge_cmd)
362 363 364
365 -class TextMerger(AbstractMerger):
366 """Merger class for text 367 368 TextMerger will append specified text files in the order that they are 369 encountered in the list of Jobs. Each file will be separated by a header 370 giving some very basic information about the individual files. 371 372 Usage: 373 374 tm = TextMerger() 375 tm.files = ['job.log','results.txt'] 376 tm.overwrite = True #False by default 377 tm.ignorefailed = True #False by default 378 379 # will produce the specified files 380 j = Job() 381 j.outputsandbox = ['job.log','results.txt'] 382 j.splitter = SomeSplitter() 383 j.merger = tm 384 j.submit() 385 386 The merge object will be used to merge the output of 387 each subjob into j.outputdir. This will be run when 388 the job completes. If the ignorefailed flag has been set 389 then the merge will also be run as the job enters the 390 killed or failed states. 391 392 The above merger object can also be used independently 393 to merge a list of jobs or the subjobs of an single job. 394 395 #tm defined above 396 tm.merge(j, outputdir = '~/merge_dir') 397 tm.merge([.. list of jobs ...], '~/merge_dir', ignorefailed = True, overwrite = False) 398 399 If ignorefailed or overwrite are set then they override the values set on the 400 merge object. 401 402 If outputdir is not specified, the default location specfied 403 in the [Mergers] section of the .gangarc file will be used. 404 405 For large text files it may be desirable to compress the merge 406 result using gzip. This can be done by setting the compress 407 flag on the TextMerger object. In this case, the merged file 408 will have a '.gz' appended to its filename. 409 410 A summary of all the files merged will be created for each entry in files. 411 This will be created when the merge of those files completes 412 successfully. The name of this is the same as the output file, with the 413 '.merge_summary' extension appended and will be placed in the same directory 414 as the merge results. 415 416 """ 417 _category = 'mergers' 418 _exportmethods = ['merge'] 419 _name = 'TextMerger' 420 _schema = AbstractMerger._schema.inherit_copy() 421 _schema.datadict['compress'] = SimpleItem(defvalue = False, doc='Output should be compressed with gzip.') 422 423
424 - def __init__(self):
425 super(TextMerger,self).__init__(_TextMergeTool())
426
427 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
428 self.merge_tool.compress = self.compress 429 #needed as exportmethods doesn't seem to cope with inheritance 430 return super(TextMerger,self).merge(jobs, outputdir, ignorefailed, overwrite)
431
432 -class RootMerger(AbstractMerger):
433 """Merger class for ROOT files 434 435 RootMerger will use the version of ROOT configured in the .gangarc file to 436 add together histograms and trees using the 'hadd' command provided by ROOT. 437 Further details of the hadd command can be found in the ROOT documentation. 438 439 Usage: 440 441 rm = RootMerger() 442 rm.files = ['hist.root','trees.root'] 443 rm.overwrite = True #False by default 444 rm.ignorefailed = True #False by default 445 rm.args = '-f2' #pass arguments to hadd 446 447 # will produce the specified files 448 j = Job() 449 j.outputsandbox = ['hist.root','trees.root'] 450 j.splitter = SomeSplitter() 451 j.merger = rm 452 j.submit() 453 454 The merge object will be used to merge the output of 455 each subjob into j.outputdir. This will be run when 456 the job completes. If the ignorefailed flag has been set 457 then the merge will also be run as the job enters the 458 killed or failed states. 459 460 The above merger object can also be used independently 461 to merge a list of jobs or the subjobs of an single job. 462 463 #rm defined above 464 rm.merge(j, outputdir = '~/merge_dir') 465 rm.merge([.. list of jobs ...], '~/merge_dir', ignorefailed = True, overwrite = False) 466 467 If ignorefailed or overwrite are set then they override the 468 values set on the merge object. 469 470 A summary of all the files merged will be created for each entry in files. 471 This will be created when the merge of those files completes 472 successfully. The name of this is the same as the output file, with the 473 '.merge_summary' extension appended and will be placed in the same directory 474 as the merge results. 475 476 If outputdir is not specified, the default location specfied 477 in the [Mergers] section of the .gangarc file will be used. 478 479 """ 480 481 _category = 'mergers' 482 _exportmethods = ['merge'] 483 _name = 'RootMerger' 484 _schema = AbstractMerger._schema.inherit_copy() 485 _schema.datadict['args'] = SimpleItem(defvalue = None, doc='Arguments to be passed to hadd.',\ 486 typelist=['str','type(None)']) 487
488 - def __init__(self):
489 super(RootMerger,self).__init__(_RootMergeTool())
490
491 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
492 self.merge_tool.args = self.args 493 #needed as exportmethods doesn't seem to cope with inheritance 494 return super(RootMerger,self).merge(jobs, outputdir, ignorefailed, overwrite)
495 496
497 -class MultipleMerger(IMerger):
498 """Merger class when merges of different file types are needed. 499 500 Here is a typical usage example: 501 502 # job produces both Root and Text files 503 j = Job() 504 505 tm = TextMerger() 506 tm.files = ['job.log','stdout'] 507 tm.overwrite = True 508 509 rm = RootMerger() 510 rm.files = ['histo.root','tree.root'] 511 rm.ignorefailed = True 512 513 mm = MultipleMerger() 514 mm.addMerger(tm) 515 mm.addMerger(rm) 516 517 j.merger = mm 518 # All files will be merged on completion 519 j.submit() 520 521 MultipleMerger objects can also be used on 522 individual Jobs or lists of Jobs. 523 524 #mm defined above 525 mm.merge([..list of Jobs ...], outputdir = '~/merge_results', ignorefailed = False, overwrite = True) 526 527 The ignorefailed and overwrite flags are 528 propagated to the individual Merger objects. 529 530 If outputdir is not specified, the default location 531 specfied in the [Mergers] section of the .gangarc 532 file will be used. 533 534 It is permissible to nest MultipleMerger objects 535 inside one another if extra hierarchy is desired. 536 537 """ 538 539 _category = 'mergers' 540 _exportmethods = ['addMerger','merge'] 541 _name = 'MultipleMerger' 542 _schema = Schema(Version(1,0), { 543 'merger_objects' : ComponentItem('mergers', defvalue = [], doc = 'A list of Merge objects to run', sequence = 1) 544 }) 545
546 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
547 #run the merger objects one at a time 548 merge_results = [] 549 for m in self.merger_objects: 550 #stop infinite recursion 551 if m is self: 552 continue 553 #run the merge 554 merge_results.append(m.merge(jobs, outputdir, ignorefailed = ignorefailed, overwrite = overwrite)) 555 #if one fails then we all fail 556 return not False in merge_results
557
558 - def addMerger(self, merger_object):
559 """Adds a merger object to the list of merges to be done.""" 560 self.merger_objects.append(merger_object)
561
562 -def findFilesToMerge(jobs):
563 """Look at a list of jobs and find a set of files present in each job that can be merged together""" 564 565 result = [] 566 567 file_map = {} 568 jobs_len = len(jobs) 569 for j in jobs: 570 for file_name in j.outputsandbox: 571 file_map[file_name] = file_map.setdefault(file_name,0) + 1 572 573 for file_name, count in file_map.iteritems(): 574 if count == jobs_len: result.append(file_name) 575 else: 576 logger.warning('The file %s was not found in all jobs to be merged and so will be ignored.', file_name) 577 logger.info('No files specified, so using %s.', str(result)) 578 579 return result
580 581
582 -class SmartMerger(IMerger):
583 """Allows the different types of merge to be run according to file extension in an automatic way. 584 585 SmartMerger accepts a list of files which it will delegate to individual Merger objects based on 586 the file extension of the file. The mapping between file extensions and Merger objects can 587 be defined in the [Mergers] section of the .gangarc file. Extensions are treated in a case 588 insensitive way. If a file extension is not recognized than the file will be ignored if the 589 ignorefailed flag is set, or the merge will fail. 590 591 Example: 592 593 sm = SmartMerger() 594 sm.files = ['stderr','histo.root','job.log','summary.txt','trees.root','stdout'] 595 sm.merge([... list of jobs ...], outputdir = '~/merge_dir')#also accepts a single Job 596 597 If outputdir is not specified, the default location specfied in the [Mergers] 598 section of the .gangarc file will be used. 599 600 If files is not specified, then it will be taken from the list of jobs given to 601 the merge method. Only files which appear in all jobs will be merged. 602 603 SmartMergers can also be attached to Job objects in the same way as other Merger 604 objects. 605 606 #sm defined above 607 j = Job() 608 j.splitter = SomeSplitter() 609 j.merger = sm 610 j.submit() 611 612 """ 613 614 _category = 'mergers' 615 _exportmethods = ['merge'] 616 _name = 'SmartMerger' 617 _schema = Schema(Version(1,0), { 618 'files' : SimpleItem(defvalue = [], typelist=['str'], sequence = 1, doc='A list of files to merge.'), 619 'ignorefailed' : SimpleItem(defvalue = False, doc='Jobs that are in the failed or killed states will be excluded from the merge when this flag is set to True.'), 620 'overwrite' : SimpleItem(defvalue = False, doc='The default behaviour for this Merger object. Will overwrite output files.') 621 } ) 622
623 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
624 625 #following same as in AbstractMerger 626 if ignorefailed == None: 627 ignorefailed = self.ignorefailed 628 629 if overwrite == None: 630 overwrite = self.overwrite 631 632 # special case the passing of a Job object. 633 from Ganga.GPIDev.Lib.Job import Job 634 if isinstance(jobs,Job): 635 if outputdir is None: 636 outputdir = jobs.outputdir 637 return self.merge(jobs.subjobs,outputdir = outputdir, ignorefailed = ignorefailed, overwrite = overwrite) 638 639 if len(jobs) == 0: 640 logger.warning('The jobslice given was empty. The merge will not continue.') 641 return AbstractMerger.success 642 643 #make a guess of what to merge if nothing is specified 644 if not self.files: 645 self.files = findFilesToMerge(jobs) 646 647 type_map = {} 648 for f in self.files: 649 650 if not getMergerObject(f): 651 652 #find the file extension and check 653 file_ext = os.path.splitext(f)[1].lstrip('.') 654 655 # default to txt 656 if not file_ext: 657 if f in ['stdout','stderr']: 658 file_ext = 'std_merge' 659 elif ignorefailed: 660 logger.warning('File extension not found for file %s and so the file will be ignored. '\ 661 'Check the name of the file.',f) 662 continue 663 else: 664 logger.warning('File extension not found for file %s and so the merge will fail. '\ 665 'Check the name of the file or set the ignorefailed flag.',f) 666 return AbstractMerger.failure 667 668 file_ext = file_ext.lower()#treat as lowercase 669 670 else: 671 #allow per file config 672 file_ext = f 673 674 #store the file association 675 type_map.setdefault(file_ext, []).append(f) 676 677 #we are going to use the MultipleMerger objects to do all this 678 multi_merge = MultipleMerger() 679 for ext in type_map.keys(): 680 681 merge_object = getMergerObject(ext) # returns an instance 682 if merge_object == None: 683 logger.error('Extension %s not recognized and so the merge will fail. '\ 684 'Check the [Mergers] section of your .gangarc file.', ext) 685 return AbstractMerger.failure 686 687 #we have a merge object, so lets go... 688 merge_object.files = type_map[ext] 689 merge_object.ignorefailed = ignorefailed 690 merge_object.overwrite = overwrite 691 #add to multimerge 692 multi_merge.addMerger(merge_object) 693 694 return multi_merge.merge(jobs, outputdir = outputdir, ignorefailed = ignorefailed, overwrite = overwrite)
695
696 -class _CustomMergeTool(IMergeTool):
697 """Allows arbitrary python modules to be used to merge""" 698 699 _category = 'merge_tools' 700 _hidden = 1 701 _name = '_CustomMergeTool' 702 _schema = IMergeTool._schema.inherit_copy() 703 _schema.datadict['module'] = FileItem(defvalue = None, doc='Path to a python module to perform the merge.') 704
705 - def mergefiles(self, file_list, output_file):
706 707 import os 708 if not os.path.exists(self.module.name): 709 raise MergerError("The module '&s' does not exist and so merging will fail.",self.module.name) 710 711 try: 712 713 module_contents = '' 714 result = False 715 716 try: 717 module_file = file(self.module.name) 718 module_contents = module_file.read() 719 finally: 720 module_file.close() 721 722 if module_contents: 723 module_contents += """ 724 _xxxResult = mergefiles(%s , '%s') 725 """ % (file_list, output_file) 726 727 import tempfile 728 out_file = tempfile.mktemp('.py') 729 try: 730 out = file(out_file,'w') 731 out.write(module_contents) 732 finally: 733 out.close() 734 735 ns = {} 736 execfile(out_file,ns) 737 os.unlink(out_file) 738 739 result = ns.get('_xxxResult',1) 740 741 if result != 0: 742 raise MergerError('The merge module returned False or did not complete properly') 743 744 745 except Exception,e: 746 raise MergerError("Merge failed: ('%s')" % str(e))
747 748 749
750 -class CustomMerger(AbstractMerger):
751 """User tool for writing custom merging tools with Python 752 753 Allows a script to be supplied that performs the merge of some custom file type. 754 The script must be a python file which defines the following function: 755 756 def mergefiles(file_list, output_file): 757 758 #perform the merge 759 if not success: 760 return -1 761 else: 762 return 0 763 764 This module will be imported and used by the CustomMerger. The file_list is a 765 list of paths to the files to be merged. output_file is a string path for 766 the output of the merge. This file must exist by the end of the merge or the 767 merge will fail. If the merge cannot proceed, then the function should return a 768 non-zero integer. 769 770 Clearly this tool is provided for advanced ganga usage only, and should be used with 771 this in mind. 772 773 """ 774 _category = 'mergers' 775 _exportmethods = ['merge'] 776 _name = 'CustomMerger' 777 _schema = AbstractMerger._schema.inherit_copy() 778 _schema.datadict['module'] = FileItem(defvalue = None, doc='Path to a python module to perform the merge.') 779 780
781 - def __init__(self):
783
784 - def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
785 if self.module is None or not self.module: 786 logger.error('No custom module specified. The merge will end now') 787 return AbstractMerger.success 788 self.merge_tool.module = self.module 789 #needed as exportmethods doesn't seem to cope with inheritance 790 return super(CustomMerger,self).merge(jobs, outputdir, ignorefailed, overwrite)
791 792 793 794 #configure the plugins 795 allPlugins.add(_CustomMergeTool,'merge_tools','_CustomMergeTool') 796 allPlugins.add(_TextMergeTool,'merge_tools','_TextMergeTool') 797 allPlugins.add(_RootMergeTool,'merge_tools','_RootMergeTool') 798 #we need a default, but don't care much what it is 799 allPlugins.setDefault('merge_tools','_TextMergeTool') 800