Package Ganga :: Package Lib :: Package Batch :: Module Batch'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.Batch.Batch'

  1  from Ganga.GPIDev.Adapters.IBackend import IBackend 
  2  from Ganga.GPIDev.Base import GangaObject 
  3  from Ganga.GPIDev.Schema import * 
  4  from Ganga.Core import BackendError 
  5   
  6  import Ganga.Utility.logging 
  7  logger = Ganga.Utility.logging.getLogger() 
  8   
  9  import Ganga.Utility.Config 
 10   
 11  ## FIXME: experimental code 
 12  ## class BatchFilter(Ganga.Utility.logging.logging.Filter): 
 13  ##     def __init__(self): 
 14  ##         Ganga.Utility.logging.logging.Filter.__init__(self) 
 15   
 16  ##     def filter(self,logrecord): 
 17  ##         #print logrecord 
 18  ##         #for l in dir(logrecord): 
 19  ##         #    print l,getattr(logrecord,l) 
 20  ##         return 1 
 21   
 22  ## logger.addFilter(BatchFilter()) 
 23   
 24  from Ganga.Core import FileWorkspace 
 25  import os 
 26   
 27   
 28  # A trival implementation of shell command with stderr/stdout capture 
 29  # This is a self-contained function (with logging). 
 30  # 
 31  # return (exitcode,soutfile,exeflag) 
 32  # soutfile - path where the stdout/stderr is stored 
 33  # exeflag - 0 if the command failed to execute, 1 if it executed 
34 -def shell_cmd(cmd,soutfile=None,allowed_exit=[0]):
35 36 if not soutfile: 37 import tempfile 38 soutfile = tempfile.mktemp() 39 40 # FIXME: garbbing stdout is done by shell magic and probably should be implemented in python directly 41 cmd = "%s > %s 2>&1" % (cmd,soutfile) 42 43 logger.debug("running shell command: %s",cmd) 44 rc = os.system(cmd) 45 46 if not rc in allowed_exit: 47 logger.debug('exit status [%d] of command %s',rc,cmd) 48 logger.debug('full output is in file: %s',soutfile) 49 logger.debug('<first 255 bytes of output>\n%s',file(soutfile).read(255)) 50 logger.debug('<end of first 255 bytes of output>') 51 52 m = None 53 54 if rc != 0: 55 logger.debug('non-zero [%d] exit status of command %s ',rc,cmd) 56 import re 57 m = re.compile(r"command not found$", re.M).search(file(soutfile).read()) 58 59 return rc,soutfile,m is None
60 61
62 -class Batch(IBackend):
63 """ Batch submission backend. 64 65 It is assumed that Batch commands (bjobs, bsub etc.) setup 66 correctly. As little assumptions as possible are made about the 67 Batch configuration but at certain sites it may not work correctly 68 due to a different Batch setup. Tested with CERN and CNAF Batch 69 installations. 70 71 Each batch system supports an 'extraopts' field, which allows customisation 72 of way the job is submitted. 73 74 PBS: 75 Take environment settings on submitting machine and export to batch job: 76 backend.extraopts = "-V" 77 78 Request minimum walltime of 24 hours and minimum memory of 2GByte: 79 backend.extraopts = "-l walltime=24:00:00 mem=2gb" 80 81 The above can be combined as: 82 backend.extraopts = "-V -l walltime=24:00:00 mem=2gb" 83 84 LSF: 85 Sends mail to you when the job is dispatched and begins execution. 86 backend.extraopts = "-B" 87 88 Assigns the Ganga job name to the batch job. The job name does not need to 89 be unique. 90 backend.extraopts = "-J "+ j.name 91 92 Run the job on a host that meets the specified resource requirements. 93 A resource requirement string describes the resources a job needs. 94 E.g request 2Gb of memory ans 1Gb of swap space 95 backend.extraopts = '-R "mem=2048" -R "swp=1024"' 96 97 Kill job if it has exceeded the deadline (i.e. for your presentation) 98 backend.extraopts = '-t 07:14:12:59' #Killed if not finished by 14 July before 1 pm 99 """ 100 _schema = Schema(Version(1,0), {'queue' : SimpleItem(defvalue='',doc='queue name as defomed in your local Batch installation'), 101 'extraopts' : SimpleItem(defvalue='',doc='extra options for Batch. See help(Batch) for more details'), 102 'id' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Batch id of the job'), 103 'exitcode' : SimpleItem(defvalue=None,typelist=['int','type(None)'],protected=1,copyable=0,doc='Process exit code'), 104 'status' : SimpleItem(defvalue='',protected=1,hidden=1,copyable=0,doc='Batch status of the job'), 105 'actualqueue' : SimpleItem(defvalue='',protected=1,copyable=0,doc='queue name where the job was submitted.'), 106 'actualCE' : SimpleItem(defvalue='',protected=1,copyable=0,doc='hostname where the job is/was running.') 107 }) 108 _category = 'backends' 109 _name = 'Batch' 110 _hidden = 1 111
112 - def __init__(self):
113 super(Batch,self).__init__()
114
115 - def command(klass,cmd,soutfile=None,allowed_exit=[0]):
116 rc,soutfile,ef = shell_cmd(cmd,soutfile,allowed_exit) 117 if not ef: 118 logger.warning('Problem submitting batch job. Maybe your chosen batch system is not available or you have configured it wrongly') 119 logger.warning(file(soutfile).read()) 120 raise BackendError(klass._name,'It seems that %s commands are not installed properly:%s'%(klass._name,file(soutfile).readline())) 121 return rc,soutfile
122 123 command = classmethod(command) 124
125 - def submit(self,jobconfig, master_input_sandbox):
126 127 job = self.getJobObject() 128 129 inw = job.getInputWorkspace() 130 outw = job.getOutputWorkspace() 131 132 #scriptpath = self.preparejob(jobconfig,inw,outw) 133 scriptpath=self.preparejob(jobconfig,master_input_sandbox) 134 135 # FIX from Angelo Carbone 136 stderr_option = '-e '+str(outw.getPath())+'stderr' 137 stdout_option = '-o '+str(outw.getPath())+'stdout' 138 139 queue_option = '' 140 if self.queue: 141 queue_option = '-q '+str(self.queue) 142 143 try: 144 jobnameopt = "-"+self.config['jobnameopt'] 145 except: 146 jobnameopt = False 147 148 if self.extraopts: 149 import re 150 for opt in re.compile(r'(-\w+)').findall(self.extraopts): 151 if opt in ('-o','-e','-oo','-eo'): 152 logger.warning("option %s is forbidden",opt) 153 return False 154 if self.queue and opt == '-q': 155 logger.warning("option %s is forbidden if queue is defined ( queue = '%s')",opt,self.queue) 156 return False 157 if jobnameopt and opt == jobnameopt: 158 jobnameopt = False 159 160 queue_option = queue_option + " " + self.extraopts 161 162 if jobnameopt and job.name != '': 163 queue_option = queue_option + " " + jobnameopt + " " + "'%s'"%(job.name) 164 165 # bugfix #16646 166 if self.config['shared_python_executable']: 167 import sys 168 script_cmd = "%s %s" % (sys.executable,scriptpath) 169 else: 170 script_cmd = scriptpath 171 172 command_str=self.config['submit_str'] % (inw.getPath(),queue_option,stderr_option,stdout_option,script_cmd) 173 self.command_string = command_str 174 rc,soutfile = self.command(command_str) 175 sout = file(soutfile).read() 176 import re 177 m = re.compile(self.config['submit_res_pattern'], re.M).search(sout) 178 if m is None: 179 logger.warning('could not match the output and extract the Batch job identifier!') 180 logger.warning('command output \n %s ',sout) 181 else: 182 self.id = m.group('id') 183 try: 184 queue = m.group('queue') 185 if self.queue != queue: 186 if self.queue: 187 logger.warning('you requested queue "%s" but the job was submitted to queue "%s"',self.queue,queue) 188 logger.warning('command output \n %s ',sout) 189 else: 190 logger.info('using default queue "%s"',queue) 191 self.actualqueue = queue 192 except IndexError: 193 logger.info('could not match the output and extract the Batch queue name') 194 195 return rc == 0
196
197 - def resubmit(self):
198 199 job = self.getJobObject() 200 201 inw = job.getInputWorkspace() 202 outw = job.getOutputWorkspace() 203 204 statusfilename = outw.getPath('__jobstatus__') 205 try: 206 os.remove(statusfilename) 207 except OSError,x: 208 if x.errno!=2: 209 logger.warning("OSError:"+str(x)) 210 211 scriptpath = inw.getPath('__jobscript__') 212 stderr_option = '-e '+str(outw.getPath())+'stderr' 213 stdout_option = '-o '+str(outw.getPath())+'stdout' 214 215 queue_option = '' 216 if self.queue: 217 queue_option = '-q '+str(self.queue) 218 219 try: 220 jobnameopt = "-"+self.config['jobnameopt'] 221 except: 222 jobnameopt = False 223 224 if self.extraopts: 225 import re 226 for opt in re.compile(r'(-\w+)').findall(self.extraopts): 227 if opt in ('-o','-e','-oo','-eo'): 228 logger.warning("option %s is forbidden",opt) 229 return False 230 if self.queue and opt == '-q': 231 logger.warning("option %s is forbidden if queue is defined ( queue = '%s')",opt,self.queue) 232 return False 233 if jobnameopt and opt == jobnameopt: 234 jobnameopt = False 235 236 queue_option = queue_option + " " + self.extraopts 237 238 if jobnameopt and job.name != '': 239 queue_option = queue_option + " " + jobnameopt + " " + "'%s'"%(job.name) 240 241 # bugfix #16646 242 if self.config['shared_python_executable']: 243 import sys 244 script_cmd = "%s %s" % (sys.executable,scriptpath) 245 else: 246 script_cmd = scriptpath 247 248 command_str=self.config['submit_str'] % (inw.getPath(),queue_option,stderr_option,stdout_option,script_cmd) 249 self.command_string = command_str 250 rc,soutfile = self.command(command_str) 251 logger.debug('from command get rc: "%d"',rc) 252 if rc == 0: 253 sout = file(soutfile).read() 254 import re 255 m = re.compile(self.config['submit_res_pattern'], re.M).search(sout) 256 if m is None: 257 logger.warning('could not match the output and extract the Batch job identifier!') 258 logger.warning('command output \n %s ',sout) 259 else: 260 self.id = m.group('id') 261 try: 262 queue = m.group('queue') 263 if self.queue != queue: 264 if self.queue: 265 logger.warning('you requested queue "%s" but the job was submitted to queue "%s"',self.queue,queue) 266 logger.warning('command output \n %s ',sout) 267 else: 268 logger.info('using default queue "%s"',queue) 269 self.actualqueue = queue 270 except IndexError: 271 logger.info('could not match the output and extract the Batch queue name') 272 else: 273 logger.warning(file(soutfile).read()) 274 275 return rc == 0
276 277
278 - def kill(self):
279 rc,soutfile = self.command(self.config['kill_str'] % (self.id)) 280 281 sout = file(soutfile).read() 282 logger.debug('while killing job %s: rc = %d',self.getJobObject().getFQID('.'),rc) 283 if rc == 0: 284 return True 285 else: 286 import re 287 m = re.compile(self.config['kill_res_pattern'],re.M).search(sout) 288 logger.warning('while killing job %s: %s',self.getJobObject().getFQID('.'), sout) 289 290 return not m==None
291
292 - def preparejob(self,jobconfig,master_input_sandbox):
293 294 job = self.getJobObject() 295 mon = job.getMonitoringService() 296 import Ganga.Core.Sandbox as Sandbox 297 subjob_input_sandbox = job.createPackedInputSandbox(jobconfig.getSandboxFiles() 298 + Sandbox.getGangaModulesAsSandboxFiles(Sandbox.getDefaultModules()) 299 + Sandbox.getGangaModulesAsSandboxFiles(mon.getSandboxModules())) 300 301 appscriptpath = [jobconfig.getExeString()] + jobconfig.getArgStrings() 302 sharedoutputpath=job.getOutputWorkspace().getPath() 303 outputpatterns = jobconfig.outputbox 304 environment = jobconfig.env 305 306 text = """#!/usr/bin/env python 307 import shutil 308 import os 309 import time 310 import popen2 311 312 ############################################################################################ 313 314 ###INLINEMODULES### 315 ###INLINEHOSTNAMEFUNCTION### 316 317 ############################################################################################ 318 319 input_sandbox = ###INPUT_SANDBOX### 320 sharedoutputpath = ###SHAREDOUTPUTPATH### 321 outputpatterns = ###OUTPUTPATTERNS### 322 appscriptpath = ###APPSCRIPTPATH### 323 environment = ###ENVIRONMENT### 324 325 # jobid is a string 326 jobid = ###JOBID### 327 328 ###PREEXECUTE### 329 330 def flush_file(f): 331 f.flush() 332 os.fsync(f.fileno()) #this forces a global flush (cache synchronization on AFS) 333 334 def open_file(fname): 335 try: 336 filehandle=file(fname,'w') 337 except IOError,x: 338 print 'ERROR: not able to write a status file: ', fname 339 print 'ERROR: ',x 340 raise 341 return filehandle 342 343 statusfilename = os.path.join(sharedoutputpath,'__jobstatus__') 344 heartbeatfilename = os.path.join(sharedoutputpath,'__heartbeat__') 345 346 statusfile=open_file(statusfilename) 347 heartbeatfile=open_file(heartbeatfilename) 348 349 line='START: '+ time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) + os.linesep 350 try: 351 line+='PID: ' + os.getenv('###JOBIDNAME###') + os.linesep 352 line+='QUEUE: ' + os.getenv('###QUEUENAME###') + os.linesep 353 line+='ACTUALCE: ' + hostname() + os.linesep 354 except: 355 pass 356 statusfile.writelines(line) 357 flush_file(statusfile) 358 359 try: 360 import tarfile 361 except ImportError,x: 362 sys.path.insert(0,###TARFILE_PYTHONPATH###) 363 import tarfile 364 365 # -- WARNING: get the input files including the python modules BEFORE sys.path.insert() 366 # -- SINCE PYTHON 2.6 THERE WAS A SUBTLE CHANGE OF SEMANTICS IN THIS AREA 367 368 for f in input_sandbox: 369 getPackedInputSandbox(f) 370 371 # -- END OF MOVED CODE BLOCK 372 373 import sys 374 sys.path.insert(0, ###GANGADIR###) 375 sys.path.insert(0,os.path.join(os.getcwd(),PYTHON_DIR)) 376 377 try: 378 import subprocess 379 except ImportError,x: 380 sys.path.insert(0,###SUBPROCESS_PYTHONPATH###) 381 import subprocess 382 383 for key,value in environment.iteritems(): 384 os.environ[key] = value 385 386 sysout2 = os.dup(sys.stdout.fileno()) 387 syserr2 = os.dup(sys.stderr.fileno()) 388 389 print >>sys.stdout,"--- GANGA APPLICATION OUTPUT BEGIN ---" 390 print >>sys.stderr,"--- GANGA APPLICATION ERROR BEGIN ---" 391 flush_file(sys.stdout) 392 flush_file(sys.stderr) 393 394 sys.stdout=file('./__syslog__','w') 395 sys.stderr=sys.stdout 396 397 ###MONITORING_SERVICE### 398 monitor = createMonitoringObject() 399 monitor.start() 400 401 result = 255 402 403 404 405 try: 406 child = subprocess.Popen(appscriptpath, shell=False, stdout=sysout2, stderr=syserr2) 407 408 while 1: 409 result = child.poll() 410 if result is not None: 411 break 412 monitor.progress() 413 heartbeatfile.write('.') 414 flush_file(heartbeatfile) 415 time.sleep(###HEARTBEATFREQUENCE###) 416 except Exception,x: 417 print 'ERROR: %s'%str(x) 418 419 monitor.progress() 420 flush_file(sys.stdout) 421 flush_file(sys.stderr) 422 sys.stdout=sys.__stdout__ 423 sys.stderr=sys.__stderr__ 424 print >>sys.stdout,"--- GANGA APPLICATION OUTPUT END ---" 425 print >>sys.stderr,"--- GANGA APPLICATION ERROR END ---" 426 427 monitor.stop(result) 428 429 try: 430 filefilter 431 except: 432 filefilter = None 433 434 from Ganga.Utility.files import multi_glob, recursive_copy 435 436 createOutputSandbox(outputpatterns,filefilter,sharedoutputpath) 437 438 for fn in ['__syslog__']: 439 try: 440 recursive_copy(fn,sharedoutputpath) 441 except Exception,x: 442 print 'ERROR: (job %s) %s'%(jobid,str(x)) 443 444 ###POSTEXECUTE### 445 446 line='EXITCODE: ' + repr(result) + os.linesep 447 line+='STOP: '+time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) + os.linesep 448 statusfile.writelines(line) 449 450 statusfile.close() 451 heartbeatfile.close() 452 os.unlink(heartbeatfilename) 453 454 sys.exit(result) 455 """ 456 457 import inspect 458 import Ganga.Core.Sandbox as Sandbox 459 import Ganga.Utility as Utility 460 text = text.replace('###INLINEMODULES###',inspect.getsource(Sandbox.WNSandbox)) 461 text = text.replace('###INLINEHOSTNAMEFUNCTION###',inspect.getsource(Utility.util.hostname)) 462 text = text.replace('###APPSCRIPTPATH###',repr(appscriptpath)) 463 #text = text.replace('###SHAREDINPUTPATH###',repr(sharedinputpath)) 464 465 logger.debug('subjob input sandbox %s ',subjob_input_sandbox) 466 logger.debug('master input sandbox %s ',master_input_sandbox) 467 468 text = text.replace('###INPUT_SANDBOX###',repr(subjob_input_sandbox+master_input_sandbox)) 469 text = text.replace('###SHAREDOUTPUTPATH###',repr(sharedoutputpath)) 470 text = text.replace('###OUTPUTPATTERNS###',repr(outputpatterns)) 471 text = text.replace('###JOBID###',repr(self.getJobObject().getFQID('.'))) 472 text = text.replace('###ENVIRONMENT###',repr(environment)) 473 text = text.replace('###PREEXECUTE###',self.config['preexecute']) 474 text = text.replace('###POSTEXECUTE###',self.config['postexecute']) 475 text = text.replace('###JOBIDNAME###',self.config['jobid_name']) 476 text = text.replace('###QUEUENAME###',self.config['queue_name']) 477 text = text.replace('###HEARTBEATFREQUENCE###',self.config['heartbeat_frequency']) 478 479 text = text.replace('###MONITORING_SERVICE###',job.getMonitoringService().getWrapperScriptConstructorText()) 480 481 from Ganga.Utility.Config import getConfig 482 483 text = text.replace('###GANGADIR###',repr(getConfig('System')['GANGA_PYTHONPATH'])) 484 485 import Ganga.PACKAGE 486 text = text.replace('###SUBPROCESS_PYTHONPATH###',repr(Ganga.PACKAGE.setup.getPackagePath2('subprocess','syspath',force=True))) 487 text = text.replace('###TARFILE_PYTHONPATH###',repr(Ganga.PACKAGE.setup.getPackagePath2('tarfile','syspath',force=True))) 488 489 from Ganga.GPIDev.Lib.File import FileBuffer 490 491 return job.getInputWorkspace().writefile(FileBuffer('__jobscript__',text),executable=1)
492
494 495 import re 496 repid = re.compile(r'^PID: (?P<pid>\d+)',re.M) 497 requeue = re.compile(r'^QUEUE: (?P<queue>\S+)',re.M) 498 reactualCE = re.compile(r'^ACTUALCE: (?P<actualCE>\S+)',re.M) 499 reexit = re.compile(r'^EXITCODE: (?P<exitcode>\d+)',re.M) 500 501 def get_last_alive(f): 502 """Time since the statusfile was last touched in seconds""" 503 import os.path,time 504 talive = 0 505 try: 506 talive = time.time()-os.path.getmtime(f) 507 except OSError,x: 508 logger.debug('Problem reading status file: %s (%s)',f,str(x)) 509 510 return talive
511 512 def get_status(f): 513 """Give (pid,queue,actualCE,exit code) for job""" 514 515 pid,queue,actualCE,exitcode=None,None,None,None 516 517 import re 518 try: 519 statusfile=file(f) 520 stat = statusfile.read() 521 except IOError,x: 522 logger.debug('Problem reading status file: %s (%s)',f,str(x)) 523 return pid,queue,actualCE,exitcode 524 525 mpid = repid.search(stat) 526 if mpid: 527 pid = int(mpid.group('pid')) 528 529 mqueue = requeue.search(stat) 530 if mqueue: 531 queue = str(mqueue.group('queue')) 532 533 mactualCE = reactualCE.search(stat) 534 if mactualCE: 535 actualCE = str(mactualCE.group('actualCE')) 536 537 mexit = reexit.search(stat) 538 if mexit: 539 exitcode = int(mexit.group('exitcode')) 540 541 return pid,queue,actualCE,exitcode
542 543 from Ganga.Utility.Config import getConfig 544 for j in jobs: 545 outw=j.getOutputWorkspace() 546 547 statusfile = os.path.join(outw.getPath(),'__jobstatus__') 548 heartbeatfile = os.path.join(outw.getPath(),'__heartbeat__') 549 pid,queue,actualCE,exitcode = get_status(statusfile) 550 551 if j.status == 'submitted': 552 if pid or queue: 553 j.updateStatus('running') 554 555 if pid: 556 j.backend.id = pid 557 558 if queue and queue != j.backend.actualqueue: 559 j.backend.actualqueue = queue 560 561 if actualCE: 562 j.backend.actualCE = actualCE 563 564 if j.status == 'running': 565 if exitcode != None: 566 # Job has finished 567 j.backend.exitcode = exitcode 568 if exitcode == 0: 569 j.updateStatus('completed') 570 else: 571 j.updateStatus('failed') 572 else: 573 # Job is still running. Check if alive 574 time = get_last_alive(heartbeatfile) 575 config = getConfig(j.backend._name) 576 if time>config['timeout']: 577 logger.warning('Job %s has disappeared from the batch system.', str(j.getFQID('.'))) 578 j.updateStatus('failed') 579 580 updateMonitoringInformation = staticmethod(updateMonitoringInformation) 581 582 #____________________________________________________________________________________ 583 584 config = Ganga.Utility.Config.makeConfig('LSF','internal LSF command line interface') 585 586 #fix bug #21687 587 config.addOption('shared_python_executable', False, "Shared PYTHON") 588 589 config.addOption('jobid_name', 'LSB_BATCH_JID', "Name of environment with ID of the job") 590 config.addOption('queue_name', 'LSB_QUEUE', "Name of environment with queue name of the job") 591 config.addOption('heartbeat_frequency', '30', "Heartbeat frequency config variable") 592 593 config.addOption('submit_str', 'cd %s; bsub %s %s %s %s', "String used to submit job to queue") 594 config.addOption('submit_res_pattern', '^Job <(?P<id>\d*)> is submitted to .*queue <(?P<queue>\S*)>', 595 "String pattern for replay from the submit command") 596 597 config.addOption('kill_str', 'bkill %s', "String used to kill job") 598 config.addOption('kill_res_pattern', 599 '(^Job <\d+> is being terminated)|(Job <\d+>: Job has already finished)|(Job <\d+>: No matching job found)', 600 "String pattern for replay from the kill command") 601 602 tempstr = ''' 603 ''' 604 config.addOption('preexecute',tempstr,"String contains commands executing before submiting job to queue") 605 606 tempstr = ''' 607 def filefilter(fn): 608 # FILTER OUT Batch INTERNAL INPUT/OUTPUT FILES: 609 # 10 digits . any number of digits . err or out 610 import re 611 internals = re.compile(r'\d{10}\.\d+.(out|err)') 612 return internals.match(fn) or fn == '.Batch.start' 613 ''' 614 config.addOption('postexecute', tempstr,"String contains commands executing before submiting job to queue") 615 config.addOption('jobnameopt', 'J', "String contains option name for name of job in batch system") 616 config.addOption('timeout',600,'Timeout in seconds after which a job is declared killed if it has not touched its heartbeat file. Heartbeat is touched every 30s so do not set this below 120 or so.') 617
618 -class LSF(Batch):
619 ''' LSF backend - submit jobs to Load Sharing Facility.''' 620 _schema = Batch._schema.inherit_copy() 621 _category = 'backends' 622 _name = 'LSF' 623 624 config = Ganga.Utility.Config.getConfig('LSF') 625
626 - def __init__(self):
627 super(LSF,self).__init__()
628 629 630 631 #____________________________________________________________________________________ 632 633 config = Ganga.Utility.Config.makeConfig('PBS','internal PBS command line interface') 634 635 config.addOption('shared_python_executable', False, "Shared PYTHON") 636 637 config.addOption('jobid_name', 'PBS_JOBID', "Name of environment with ID of the job") 638 config.addOption('queue_name', 'PBS_QUEUE', "Name of environment with queue name of the job") 639 config.addOption('heartbeat_frequency', '30', "Heartbeat frequency config variable") 640 641 config.addOption('submit_str', 'cd %s; qsub %s %s %s %s', "String used to submit job to queue") 642 config.addOption('submit_res_pattern', '^(?P<id>\d*)\.pbs\s*', "String pattern for replay from the submit command") 643 644 config.addOption('kill_str', 'qdel %s', "String used to kill job") 645 config.addOption('kill_res_pattern', '(^$)|(qdel: Unknown Job Id)', "String pattern for replay from the kill command") 646 647 tempstr=''' 648 env = os.environ 649 jobnumid = env["PBS_JOBID"] 650 os.system("mkdir /tmp/%s/" %jobnumid) 651 os.chdir("/tmp/%s/" %jobnumid) 652 os.environ["PATH"]+=":." 653 ''' 654 config.addOption('preexecute', tempstr, "String contains commands executing before submiting job to queue") 655 656 tempstr=''' 657 env = os.environ 658 jobnumid = env["PBS_JOBID"] 659 os.chdir("/tmp/") 660 os.system("rm -rf /tmp/%s/" %jobnumid) 661 ''' 662 config.addOption('postexecute', tempstr, "String contains commands executing before submiting job to queue") 663 config.addOption('jobnameopt', 'N', "String contains option name for name of job in batch system") 664 config.addOption('timeout',600,'Timeout in seconds after which a job is declared killed if it has not touched its heartbeat file. Heartbeat is touched every 30s so do not set this below 120 or so.') 665 666
667 -class PBS(Batch):
668 ''' PBS backend - submit jobs to Portable Batch System. 669 ''' 670 _schema = Batch._schema.inherit_copy() 671 _category = 'backends' 672 _name = 'PBS' 673 674 config = Ganga.Utility.Config.getConfig('PBS')
675 - def __init__(self):
676 super(PBS,self).__init__()
677 678 679 #____________________________________________________________________________________ 680 681 config = Ganga.Utility.Config.makeConfig('SGE','internal SGE command line interface') 682 683 config.addOption('shared_python_executable', False, "Shared PYTHON") 684 685 config.addOption('jobid_name', 'JOB_ID', "Name of environment with ID of the job") 686 config.addOption('queue_name', 'QUEUE', "Name of environment with queue name of the job") 687 config.addOption('heartbeat_frequency', '30', "Heartbeat frequency config variable") 688 689 #the -V options means that all environment variables are transferred to the batch job (ie the same as the default behaviour on LSF at CERN) 690 config.addOption('submit_str', 'cd %s; qsub -cwd -V %s %s %s %s', "String used to submit job to queue") 691 config.addOption('submit_res_pattern', 'Your job (?P<id>\d+) (.+)', "String pattern for replay from the submit command") 692 693 config.addOption('kill_str', 'qdel %s', "String used to kill job") 694 config.addOption('kill_res_pattern', '(has registered the job +\d+ +for deletion)|(denied: job +"\d+" +does not exist)', 695 "String pattern for replay from the kill command") 696 697 #From the SGE man page on qsub 698 # 699 #=========================== 700 #Furthermore, Grid Engine sets additional variables into the job's 701 #environment, as listed below. 702 #: 703 #: 704 #TMPDIR 705 # The absolute path to the job's temporary working directory. 706 #============================= 707 708 config.addOption('preexecute', 'os.chdir(os.environ["TMPDIR"])\nos.environ["PATH"]+=":."', 709 "String contains commands executing before submiting job to queue") 710 config.addOption('postexecute', '', "String contains commands executing before submiting job to queue") 711 config.addOption('jobnameopt', 'N', "String contains option name for name of job in batch system") 712 config.addOption('timeout',600,'Timeout in seconds after which a job is declared killed if it has not touched its heartbeat file. Heartbeat is touched every 30s so do not set this below 120 or so.') 713 714
715 -class SGE(Batch):
716 ''' SGE backend - submit jobs to Sun Grid Engine. 717 ''' 718 _schema = Batch._schema.inherit_copy() 719 _category = 'backends' 720 _name = 'SGE' 721 722 config = Ganga.Utility.Config.getConfig('SGE')
723 - def __init__(self):
724 super(SGE,self).__init__()
725