Package Ganga :: Package Lib :: Package LCG :: Module CREAM
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.LCG.CREAM

   1  # CREAM backend 
   2  import os 
   3  import os.path 
   4  import math 
   5  import re 
   6   
   7  from urlparse import urlparse 
   8   
   9  from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm 
  10  from Ganga.Core import GangaException 
  11   
  12  from Ganga.GPIDev.Schema import * 
  13  from Ganga.GPIDev.Lib.File import * 
  14  from Ganga.GPIDev.Adapters.IBackend import IBackend 
  15  from Ganga.Utility.Config import getConfig 
  16  from Ganga.Utility.logging import getLogger, log_user_exception 
  17  from Ganga.Lib.LCG.Utility import * 
  18  from Ganga.Lib.LCG.ElapsedTimeProfiler import ElapsedTimeProfiler 
  19   
  20  from Ganga.Lib.LCG.Grid import Grid 
  21  from Ganga.Lib.LCG.LCG import grids 
  22  from Ganga.Lib.LCG.GridftpSandboxCache import GridftpSandboxCache 
  23   
24 -def __cream_resolveOSBList__(job, jdl):
25 26 osbURIList = [] 27 28 re_osb = re.compile('^.*OutputSandbox\s+\=\s+\{(.*)\}\s?\]?$') 29 30 for l in jdl.split(';'): 31 m = re_osb.match( l ) 32 if m: 33 osb = m.group(1) 34 osb = re.sub(r'\s?\"\s?', '', osb) 35 36 for f in osb.split(','): 37 if not urlparse(f)[0]: 38 osbURIList.append('%s/%s' % ( job.backend.osbURI, os.path.basename(f)) ) 39 else: 40 osbURIList.append(f) 41 break 42 43 return osbURIList
44
45 -class CREAM(IBackend):
46 '''CREAM backend - direct job submission to gLite CREAM CE''' 47 _schema = Schema(Version(1,0), { 48 'CE' : SimpleItem(defvalue='',doc='CREAM CE endpoint'), 49 'jobtype' : SimpleItem(defvalue='Normal',doc='Job type: Normal, MPICH'), 50 'requirements' : ComponentItem('LCGRequirements',doc='Requirements for the resource selection'), 51 'sandboxcache' : ComponentItem('GridSandboxCache',copyable=1,doc='Interface for handling oversized input sandbox'), 52 'id' : SimpleItem(defvalue='',typelist=['str','list'],protected=1,copyable=0,doc='Middleware job identifier'), 53 'status' : SimpleItem(defvalue='',typelist=['str','dict'], protected=1,copyable=0,doc='Middleware job status'), 54 'exitcode' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Application exit code'), 55 'exitcode_cream' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Middleware exit code'), 56 'actualCE' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The CREAM CE where the job actually runs.'), 57 'reason' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Reason of causing the job status'), 58 'workernode' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The worker node on which the job actually runs.'), 59 'isbURI' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The input sandbox URI on CREAM CE'), 60 'osbURI' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The output sandbox URI on CREAM CE') 61 }) 62 63 _category = 'backends' 64 65 _name = 'CREAM' 66
67 - def __init__(self):
68 super(CREAM, self).__init__() 69 70 # dynamic requirement object loading 71 try: 72 reqName1 = config['Requirements'] 73 reqName = config['Requirements'].split('.').pop() 74 reqModule = __import__(reqName1, globals(), locals(), [reqName1]) 75 reqClass = vars(reqModule)[reqName] 76 self.requirements = reqClass() 77 78 logger.debug('load %s as LCGRequirements' % reqName) 79 except: 80 logger.debug('load default LCGRequirements') 81 pass 82 83 # dynamic sandbox cache object loading 84 ## force to use GridftpSandboxCache 85 self.sandboxcache = GridftpSandboxCache() 86 try: 87 scName1 = config['SandboxCache'] 88 scName = config['SandboxCache'].split('.').pop() 89 scModule = __import__(scName1, globals(), locals(), [scName1]) 90 scClass = vars(scModule)[scName] 91 self.sandboxcache = scClass() 92 logger.debug('load %s as SandboxCache' % scName) 93 except: 94 logger.debug('load default SandboxCache') 95 pass
96
97 - def __refresh_jobinfo__(self,job):
98 '''Refresh the lcg jobinfo. It will be called after resubmission.''' 99 job.backend.status = '' 100 job.backend.reason = '' 101 job.backend.actualCE = '' 102 job.backend.exitcode = '' 103 job.backend.exitcode_cream = '' 104 job.backend.workernode = '' 105 job.backend.isbURI = '' 106 job.backend.osbURI = ''
107
108 - def __setup_sandboxcache__(self, job):
109 '''Sets up the sandbox cache object to adopt the runtime configuration of the LCG backend''' 110 111 re_token = re.compile('^token:(.*):(.*)$') 112 113 self.sandboxcache.vo = config['VirtualOrganisation'] 114 self.sandboxcache.middleware = 'GLITE' 115 self.sandboxcache.timeout = config['SandboxTransferTimeout'] 116 117 if self.sandboxcache._name == 'LCGSandboxCache': 118 if not self.sandboxcache.lfc_host: 119 self.sandboxcache.lfc_host = grids[self.middleware.upper()].__get_lfc_host__() 120 121 if not self.sandboxcache.se: 122 123 token = '' 124 se_host = config['DefaultSE'] 125 m = re_token.match(se_host) 126 if m: 127 token = m.group(1) 128 se_host = m.group(2) 129 130 self.sandboxcache.se = se_host 131 132 if token: 133 self.sandboxcache.srm_token = token 134 135 if (self.sandboxcache.se_type in ['srmv2']) and (not self.sandboxcache.srm_token): 136 self.sandboxcache.srm_token = config['DefaultSRMToken'] 137 138 elif self.sandboxcache._name == 'DQ2SandboxCache': 139 140 ## generate a new dataset name if not given 141 if not self.sandboxcache.dataset_name: 142 from GangaAtlas.Lib.ATLASDataset.DQ2Dataset import dq2outputdatasetname 143 self.sandboxcache.dataset_name,unused = dq2outputdatasetname("%s.input"%get_uuid(), 0, False, '') 144 145 ## subjobs inherits the dataset name from the master job 146 for sj in job.subjobs: 147 sj.backend.sandboxcache.dataset_name = self.sandboxcache.dataset_name 148 149 elif self.sandboxcache._name == 'GridftpSandboxCache': 150 if config['CreamInputSandboxBaseURI']: 151 self.sandboxcache.baseURI = config['CreamInputSandboxBaseURI'] 152 elif self.CE: 153 ce_host = re.sub(r'\:[0-9]+','',self.CE.split('/cream')[0]) 154 self.sandboxcache.baseURI = 'gsiftp://%s/opt/glite/var/cream_sandbox/%s' % ( ce_host, self.sandboxcache.vo ) 155 else: 156 logger.error('baseURI not available for GridftpSandboxCache') 157 return False 158 159 return True
160
161 - def __check_and_prestage_inputfile__(self, file):
162 '''Checks the given input file size and if it's size is 163 over "BoundSandboxLimit", prestage it to a grid SE. 164 165 The argument is a path of the local file. 166 167 It returns a dictionary containing information to refer to the file: 168 169 idx = {'lfc_host': lfc_host, 170 'local': [the local file pathes], 171 'remote': {'fname1': 'remote index1', 'fname2': 'remote index2', ... } 172 } 173 174 If prestaging failed, None object is returned. 175 176 If the file has been previously uploaded (according to md5sum), 177 the prestaging is ignored and index to the previously uploaded file 178 is returned. 179 ''' 180 181 idx = {'lfc_host':'', 'local':[], 'remote':{}} 182 183 job = self.getJobObject() 184 185 ## read-in the previously uploaded files 186 uploadedFiles = [] 187 188 ## getting the uploaded file list from the master job 189 if job.master: 190 uploadedFiles += job.master.backend.sandboxcache.get_cached_files() 191 192 ## set and get the $LFC_HOST for uploading oversized sandbox 193 self.__setup_sandboxcache__(job) 194 195 uploadedFiles += self.sandboxcache.get_cached_files() 196 197 lfc_host = None 198 199 ## for LCGSandboxCache, take the one specified in the sansboxcache object. 200 ## the value is exactly the same as the one from the local grid shell env. if 201 ## it is not specified exclusively. 202 if self.sandboxcache._name == 'LCGSandboxCache': 203 lfc_host = self.sandboxcache.lfc_host 204 205 ## or in general, query it from the Grid object 206 if not lfc_host: 207 lfc_host = grids[self.sandboxcache.middleware.upper()].__get_lfc_host__() 208 209 idx['lfc_host'] = lfc_host 210 211 abspath = os.path.abspath(file) 212 fsize = os.path.getsize(abspath) 213 214 if fsize > config['BoundSandboxLimit']: 215 216 md5sum = get_md5sum(abspath, ignoreGzipTimestamp=True) 217 218 doUpload = True 219 for uf in uploadedFiles: 220 if uf.md5sum == md5sum: 221 # the same file has been uploaded to the iocache 222 idx['remote'][os.path.basename(file)] = uf.id 223 doUpload = False 224 break 225 226 if doUpload: 227 228 logger.warning('The size of %s is larger than the sandbox limit (%d byte). Please wait while pre-staging ...' % (file,config['BoundSandboxLimit']) ) 229 230 if self.sandboxcache.upload( [abspath] ): 231 remote_sandbox = self.sandboxcache.get_cached_files()[-1] 232 idx['remote'][remote_sandbox.name] = remote_sandbox.id 233 else: 234 logger.error('Oversized sandbox not successfully pre-staged') 235 return None 236 else: 237 idx['local'].append(abspath) 238 239 return idx
240
241 - def __mt_job_prepare__(self, rjobs, subjobconfigs, masterjobconfig):
242 '''preparing jobs in multiple threads''' 243 244 logger.warning('preparing %d subjobs ... it may take a while' % len(rjobs)) 245 246 # prepare the master job (i.e. create shared inputsandbox, etc.) 247 master_input_sandbox=IBackend.master_prepare(self,masterjobconfig) 248 249 ## uploading the master job if it's over the WMS sandbox limitation 250 for f in master_input_sandbox: 251 master_input_idx = self.__check_and_prestage_inputfile__(f) 252 253 if not master_input_idx: 254 logger.error('master input sandbox perparation failed: %s' % f) 255 return None 256 257 # the algorithm for preparing a single bulk job 258 class MyAlgorithm(Algorithm): 259 260 def __init__(self): 261 Algorithm.__init__(self)
262 263 def process(self, sj_info): 264 my_sc = sj_info[0] 265 my_sj = sj_info[1] 266 267 try: 268 logger.debug("preparing job %s" % my_sj.getFQID('.')) 269 jdlpath = my_sj.backend.preparejob(my_sc, master_input_sandbox) 270 271 if (not jdlpath) or (not os.path.exists(jdlpath)): 272 raise GangaException('job %s not properly prepared' % my_sj.getFQID('.')) 273 274 self.__appendResult__( my_sj.id, jdlpath ) 275 return True 276 except Exception,x: 277 log_user_exception() 278 return False
279 280 mt_data = [] 281 for sc,sj in zip(subjobconfigs,rjobs): 282 mt_data.append( [sc, sj] ) 283 284 myAlg = MyAlgorithm() 285 myData = Data(collection=mt_data) 286 287 runner = MTRunner(name='lcg_jprepare', algorithm=myAlg, data=myData, numThread=10) 288 runner.start() 289 runner.join(-1) 290 291 if len(runner.getDoneList()) < len(mt_data): 292 return None 293 else: 294 # return a JDL file dictionary with subjob ids as keys, JDL file paths as values 295 return runner.getResults() 296
297 - def __mt_bulk_submit__(self, node_jdls):
298 '''submitting jobs in multiple threads''' 299 300 job = self.getJobObject() 301 302 logger.warning('submitting %d subjobs ... it may take a while' % len(node_jdls)) 303 304 # the algorithm for submitting a single bulk job 305 class MyAlgorithm(Algorithm): 306 307 def __init__(self, gridObj, masterInputWorkspace, ce): 308 Algorithm.__init__(self) 309 self.inpw = masterInputWorkspace 310 self.gridObj = gridObj 311 self.ce = ce
312 313 def process(self, jdl_info): 314 my_sj_id = jdl_info[0] 315 my_sj_jdl = jdl_info[1] 316 317 my_sj_jid = self.gridObj.cream_submit(my_sj_jdl, self.ce) 318 319 if not my_sj_jid: 320 return False 321 else: 322 self.__appendResult__( my_sj_id, my_sj_jid ) 323 return True 324 325 mt_data = [] 326 for id, jdl in node_jdls.items(): 327 mt_data.append( (id, jdl) ) 328 329 myAlg = MyAlgorithm(gridObj=grids['GLITE'],masterInputWorkspace=job.getInputWorkspace(), ce=self.CE) 330 myData = Data(collection=mt_data) 331 332 runner = MTRunner(name='cream_jsubmit', algorithm=myAlg, data=myData, numThread=config['SubmissionThread']) 333 runner.start() 334 runner.join(timeout=-1) 335 336 if len(runner.getDoneList()) < len(mt_data): 337 ## not all bulk jobs are successfully submitted. canceling the submitted jobs on WMS immediately 338 logger.error('some bulk jobs not successfully (re)submitted, canceling submitted jobs on WMS') 339 grids['GLITE'].cancelMultiple( runner.getResults().values() ) 340 return None 341 else: 342 return runner.getResults() 343
344 - def __jobWrapperTemplate__(self):
345 '''Create job wrapper''' 346 347 script = """#!/usr/bin/env python 348 #----------------------------------------------------- 349 # This job wrapper script is automatically created by 350 # GANGA LCG backend handler. 351 # 352 # It controls: 353 # 1. unpack input sandbox 354 # 2. invoke application executable 355 # 3. invoke monitoring client 356 #----------------------------------------------------- 357 import os,os.path,shutil,tempfile 358 import sys,popen2,time,traceback 359 360 #bugfix #36178: subprocess.py crashes if python 2.5 is used 361 #try to import subprocess from local python installation before an 362 #import from PYTHON_DIR is attempted some time later 363 try: 364 import subprocess 365 except ImportError: 366 pass 367 368 ## Utility functions ## 369 def timeString(): 370 return time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) 371 372 def printInfo(s): 373 out.write(timeString() + ' [Info]' + ' ' + str(s) + os.linesep) 374 out.flush() 375 376 def printError(s): 377 out.write(timeString() + ' [Error]' + ' ' + str(s) + os.linesep) 378 out.flush() 379 380 def lcg_file_download(vo,guid,localFilePath,timeout=60,maxRetry=3): 381 cmd = 'lcg-cp -t %d --vo %s %s file://%s' % (timeout,vo,guid,localFilePath) 382 383 printInfo('LFC_HOST set to %s' % os.environ['LFC_HOST']) 384 printInfo('lcg-cp timeout: %d' % timeout) 385 386 i = 0 387 rc = 0 388 isDone = False 389 try_again = True 390 391 while try_again: 392 i = i + 1 393 try: 394 ps = os.popen(cmd) 395 status = ps.close() 396 397 if not status: 398 isDone = True 399 printInfo('File %s download from iocache' % os.path.basename(localFilePath)) 400 else: 401 raise IOError("Download file %s from iocache failed with error code: %d, trial %d." % (os.path.basename(localFilePath), status, i)) 402 403 except IOError, e: 404 isDone = False 405 printError(str(e)) 406 407 if isDone: 408 try_again = False 409 elif i == maxRetry: 410 try_again = False 411 else: 412 try_again = True 413 414 return isDone 415 416 ## system command executor with subprocess 417 def execSyscmdSubprocess(cmd, wdir=os.getcwd()): 418 419 import os, subprocess 420 421 global exitcode 422 423 outfile = file('stdout','w') 424 errorfile = file('stderr','w') 425 426 try: 427 child = subprocess.Popen(cmd, cwd=wdir, shell=True, stdout=outfile, stderr=errorfile) 428 429 while 1: 430 exitcode = child.poll() 431 if exitcode is not None: 432 break 433 else: 434 outfile.flush() 435 errorfile.flush() 436 monitor.progress() 437 time.sleep(0.3) 438 finally: 439 monitor.progress() 440 441 outfile.flush() 442 errorfile.flush() 443 outfile.close() 444 errorfile.close() 445 446 return True 447 448 ## system command executor with multi-thread 449 ## stderr/stdout handler 450 def execSyscmdEnhanced(cmd, wdir=os.getcwd()): 451 452 import os, threading 453 454 cwd = os.getcwd() 455 456 isDone = False 457 458 try: 459 ## change to the working directory 460 os.chdir(wdir) 461 462 child = popen2.Popen3(cmd,1) 463 child.tochild.close() # don't need stdin 464 465 class PipeThread(threading.Thread): 466 467 def __init__(self,infile,outfile,stopcb): 468 self.outfile = outfile 469 self.infile = infile 470 self.stopcb = stopcb 471 self.finished = 0 472 threading.Thread.__init__(self) 473 474 def run(self): 475 stop = False 476 while not stop: 477 buf = self.infile.read(10000) 478 self.outfile.write(buf) 479 self.outfile.flush() 480 time.sleep(0.01) 481 stop = self.stopcb() 482 #FIXME: should we do here?: self.infile.read() 483 #FIXME: this is to make sure that all the output is read (if more than buffer size of output was produced) 484 self.finished = 1 485 486 def stopcb(poll=False): 487 global exitcode 488 if poll: 489 exitcode = child.poll() 490 return exitcode != -1 491 492 out_thread = PipeThread(child.fromchild, sys.stdout, stopcb) 493 err_thread = PipeThread(child.childerr, sys.stderr, stopcb) 494 495 out_thread.start() 496 err_thread.start() 497 while not out_thread.finished and not err_thread.finished: 498 stopcb(True) 499 monitor.progress() 500 time.sleep(0.3) 501 monitor.progress() 502 503 sys.stdout.flush() 504 sys.stderr.flush() 505 506 isDone = True 507 508 except(Exception,e): 509 isDone = False 510 511 ## return to the original directory 512 os.chdir(cwd) 513 514 return isDone 515 516 ############################################################################################ 517 518 ###INLINEMODULES### 519 520 ############################################################################################ 521 522 ## Main program ## 523 524 outputsandbox = ###OUTPUTSANDBOX### 525 input_sandbox = ###INPUTSANDBOX### 526 wrapperlog = ###WRAPPERLOG### 527 appexec = ###APPLICATIONEXEC### 528 appargs = ###APPLICATIONARGS### 529 appenvs = ###APPLICATIONENVS### 530 timeout = ###TRANSFERTIMEOUT### 531 532 exitcode=-1 533 534 import sys, stat, os, os.path, commands 535 536 # Change to scratch directory if provided 537 scratchdir = '' 538 tmpdir = '' 539 540 orig_wdir = os.getcwd() 541 542 # prepare log file for job wrapper 543 out = open(os.path.join(orig_wdir, wrapperlog),'w') 544 545 if os.getenv('EDG_WL_SCRATCH'): 546 scratchdir = os.getenv('EDG_WL_SCRATCH') 547 elif os.getenv('TMPDIR'): 548 scratchdir = os.getenv('TMPDIR') 549 550 if scratchdir: 551 (status, tmpdir) = commands.getstatusoutput('mktemp -d %s/gangajob_XXXXXXXX' % (scratchdir)) 552 if status == 0: 553 os.chdir(tmpdir) 554 else: 555 ## if status != 0, tmpdir should contains error message so print it to stderr 556 printError('Error making ganga job scratch dir: %s' % tmpdir) 557 printInfo('Unable to create ganga job scratch dir in %s. Run directly in: %s' % ( scratchdir, os.getcwd() ) ) 558 559 ## reset scratchdir and tmpdir to disable the usage of Ganga scratch dir 560 scratchdir = '' 561 tmpdir = '' 562 563 wdir = os.getcwd() 564 565 if scratchdir: 566 printInfo('Changed working directory to scratch directory %s' % tmpdir) 567 try: 568 os.system("ln -s %s %s" % (os.path.join(orig_wdir, 'stdout'), os.path.join(wdir, 'stdout'))) 569 os.system("ln -s %s %s" % (os.path.join(orig_wdir, 'stderr'), os.path.join(wdir, 'stderr'))) 570 except Exception,e: 571 printError(sys.exc_info()[0]) 572 printError(sys.exc_info()[1]) 573 str_traceback = traceback.format_tb(sys.exc_info()[2]) 574 for str_tb in str_traceback: 575 printError(str_tb) 576 printInfo('Linking stdout & stderr to original directory failed. Looking at stdout during job run may not be possible') 577 578 os.environ['PATH'] = '.:'+os.environ['PATH'] 579 580 vo = os.environ['GANGA_LCG_VO'] 581 582 try: 583 printInfo('Job Wrapper start.') 584 585 # download inputsandbox from remote cache 586 for f,guid in input_sandbox['remote'].iteritems(): 587 if not lcg_file_download(vo, guid, os.path.join(wdir,f), timeout=int(timeout)): 588 raise Exception('Download remote input %s:%s failed.' % (guid,f) ) 589 else: 590 getPackedInputSandbox(f) 591 592 printInfo('Download inputsandbox from iocache passed.') 593 594 # unpack inputsandbox from wdir 595 for f in input_sandbox['local']: 596 getPackedInputSandbox(os.path.join(orig_wdir,f)) 597 598 printInfo('Unpack inputsandbox passed.') 599 600 printInfo('Loading Python modules ...') 601 602 sys.path.insert(0,os.path.join(wdir,PYTHON_DIR)) 603 604 # check the python library path 605 try: 606 printInfo(' ** PYTHON_DIR: %s' % os.environ['PYTHON_DIR']) 607 except KeyError: 608 pass 609 610 try: 611 printInfo(' ** PYTHONPATH: %s' % os.environ['PYTHONPATH']) 612 except KeyError: 613 pass 614 615 for lib_path in sys.path: 616 printInfo(' ** sys.path: %s' % lib_path) 617 618 ###MONITORING_SERVICE### 619 monitor = createMonitoringObject() 620 monitor.start() 621 622 # execute application 623 624 ## convern appenvs into environment setup script to be 'sourced' before executing the user executable 625 626 printInfo('Prepare environment variables for application executable') 627 628 env_setup_script = os.path.join(os.getcwd(), '__ganga_lcg_env__.sh') 629 630 f = open( env_setup_script, 'w') 631 f.write('#!/bin/sh' + os.linesep ) 632 f.write('##user application environmet setup script generated by Ganga job wrapper' + os.linesep) 633 for k,v in appenvs.items(): 634 635 str_env = 'export %s="%s"' % (k, v) 636 637 printInfo(' ** ' + str_env) 638 639 f.write(str_env + os.linesep) 640 f.close() 641 642 try: #try to make shipped executable executable 643 os.chmod('%s/%s'% (wdir,appexec),stat.S_IXUSR|stat.S_IRUSR|stat.S_IWUSR) 644 except: 645 pass 646 647 status = False 648 try: 649 # use subprocess to run the user's application if the module is available on the worker node 650 import subprocess 651 printInfo('Load application executable with subprocess module') 652 status = execSyscmdSubprocess('source %s; %s %s' % (env_setup_script, appexec, appargs), wdir) 653 except ImportError,err: 654 # otherwise, use separate threads to control process IO pipes 655 printInfo('Load application executable with separate threads') 656 status = execSyscmdEnhanced('source %s; %s %s' % (env_setup_script, appexec, appargs), wdir) 657 658 os.system("cp %s/stdout stdout.1" % orig_wdir) 659 os.system("cp %s/stderr stderr.1" % orig_wdir) 660 661 printInfo('GZipping stdout and stderr...') 662 663 os.system("gzip stdout.1 stderr.1") 664 665 # move them to the original wdir so they can be picked up 666 os.system("mv stdout.1.gz %s/stdout.gz" % orig_wdir) 667 os.system("mv stderr.1.gz %s/stderr.gz" % orig_wdir) 668 669 if not status: 670 raise Exception('Application execution failed.') 671 printInfo('Application execution passed with exit code %d.' % exitcode) 672 673 createPackedOutputSandbox(outputsandbox,None,orig_wdir) 674 675 # pack outputsandbox 676 # printInfo('== check output ==') 677 # for line in os.popen('pwd; ls -l').readlines(): 678 # printInfo(line) 679 680 printInfo('Pack outputsandbox passed.') 681 monitor.stop(exitcode) 682 683 # Clean up after us - All log files and packed outputsandbox should be in "wdir" 684 if scratchdir: 685 os.chdir(orig_wdir) 686 os.system("rm %s -rf" % wdir) 687 except Exception,e: 688 printError(sys.exc_info()[0]) 689 printError(sys.exc_info()[1]) 690 str_traceback = traceback.format_tb(sys.exc_info()[2]) 691 for str_tb in str_traceback: 692 printError(str_tb) 693 694 printInfo('Job Wrapper stop.') 695 696 out.close() 697 698 # always return exit code 0 so the in the case of application failure 699 # one can always get stdout and stderr back to the UI for debug. 700 sys.exit(0) 701 """ 702 return script
703
704 - def preparejob(self,jobconfig,master_job_sandbox):
705 '''Prepare the JDL''' 706 707 script = self.__jobWrapperTemplate__() 708 709 job = self.getJobObject() 710 inpw = job.getInputWorkspace() 711 712 wrapperlog = '__jobscript__.log' 713 714 import Ganga.Core.Sandbox as Sandbox 715 716 script = script.replace('###OUTPUTSANDBOX###',repr(jobconfig.outputbox)) #FIXME: check what happens if 'stdout','stderr' are specified here 717 718 script = script.replace('###APPLICATION_NAME###',job.application._name) 719 script = script.replace('###APPLICATIONEXEC###',repr(jobconfig.getExeString())) 720 script = script.replace('###APPLICATIONARGS###',repr(jobconfig.getArguments())) 721 722 if jobconfig.env: 723 script = script.replace('###APPLICATIONENVS###',repr(jobconfig.env)) 724 else: 725 script = script.replace('###APPLICATIONENVS###',repr({})) 726 727 script = script.replace('###WRAPPERLOG###',repr(wrapperlog)) 728 import inspect 729 script = script.replace('###INLINEMODULES###',inspect.getsource(Sandbox.WNSandbox)) 730 731 mon = job.getMonitoringService() 732 733 self.monInfo = None 734 735 # set the monitoring file by default to the stdout 736 if type(self.monInfo) is type({}): 737 self.monInfo['remotefile'] = 'stdout' 738 739 # try to print out the monitoring service information in debug mode 740 try: 741 logger.debug('job info of monitoring service: %s' % str(self.monInfo)) 742 except: 743 pass 744 745 script = script.replace('###MONITORING_SERVICE###',mon.getWrapperScriptConstructorText()) 746 747 # prepare input/output sandboxes 748 packed_files = jobconfig.getSandboxFiles() + Sandbox.getGangaModulesAsSandboxFiles(Sandbox.getDefaultModules()) + Sandbox.getGangaModulesAsSandboxFiles(mon.getSandboxModules()) 749 sandbox_files = job.createPackedInputSandbox(packed_files) 750 751 ## sandbox of child jobs should include master's sandbox 752 sandbox_files.extend(master_job_sandbox) 753 754 ## check the input file size and pre-upload larger inputs to the iocache 755 lfc_host = '' 756 757 input_sandbox_uris = [] 758 input_sandbox_names = [] 759 760 ick = True 761 762 max_prestaged_fsize = 0 763 for f in sandbox_files: 764 765 idx = self.__check_and_prestage_inputfile__(f) 766 767 if not idx: 768 logger.error('input sandbox preparation failed: %s' % f) 769 ick = False 770 break 771 else: 772 773 if idx['lfc_host']: 774 lfc_host = idx['lfc_host'] 775 776 if idx['remote']: 777 abspath = os.path.abspath(f) 778 fsize = os.path.getsize(abspath) 779 780 if fsize > max_prestaged_fsize: 781 max_prestaged_fsize = fsize 782 783 input_sandbox_uris.append( idx['remote'][ os.path.basename(f) ] ) 784 785 input_sandbox_names.append( os.path.basename( urlparse(f)[2] ) ) 786 787 if idx['local']: 788 input_sandbox_uris += idx['local'] 789 input_sandbox_names.append( os.path.basename(f) ) 790 791 if not ick: 792 logger.error('stop job submission') 793 return None 794 795 ## determin the lcg-cp timeout according to the max_prestaged_fsize 796 ## - using the assumption of 1 MB/sec. 797 max_prestaged_fsize = 0 798 lfc_host = '' 799 transfer_timeout = config['SandboxTransferTimeout'] 800 predict_timeout = int( math.ceil( max_prestaged_fsize/1000000.0 ) ) 801 802 if predict_timeout > transfer_timeout: 803 transfer_timeout = predict_timeout 804 805 if transfer_timeout < 60: 806 transfer_timeout = 60 807 808 script = script.replace('###TRANSFERTIMEOUT###', '%d' % transfer_timeout) 809 810 ## update the job wrapper with the inputsandbox list 811 script = script.replace('###INPUTSANDBOX###',repr({'remote':{}, 'local': input_sandbox_names })) 812 813 ## write out the job wrapper and put job wrapper into job's inputsandbox 814 scriptPath = inpw.writefile(FileBuffer('__jobscript_%s__' % job.getFQID('.'),script),executable=1) 815 input_sandbox = input_sandbox_uris + [scriptPath] 816 817 for isb in input_sandbox: 818 logger.debug('ISB URI: %s' % isb) 819 820 ## compose output sandbox to include by default the following files: 821 ## - gzipped stdout (transferred only when the JobLogHandler is WMS) 822 ## - gzipped stderr (transferred only when the JobLogHandler is WMS) 823 ## - __jobscript__.log (job wrapper's log) 824 output_sandbox = [wrapperlog] 825 826 if config['JobLogHandler'] in ['WMS']: 827 output_sandbox += ['stdout.gz','stderr.gz'] 828 829 if len(jobconfig.outputbox): 830 output_sandbox += [Sandbox.OUTPUT_TARBALL_NAME] 831 832 ## compose LCG JDL 833 jdl = { 834 'VirtualOrganisation' : config['VirtualOrganisation'], 835 'Executable' : os.path.basename(scriptPath), 836 'Environment': {'GANGA_LCG_VO': config['VirtualOrganisation'], 'GANGA_LOG_HANDLER': config['JobLogHandler'], 'LFC_HOST': lfc_host}, 837 'StdOutput' : 'stdout', 838 'StdError' : 'stderr', 839 'InputSandbox' : input_sandbox, 840 'OutputSandbox' : output_sandbox, 841 'OutputSandboxBaseDestURI': 'gsiftp://localhost' 842 } 843 844 jdl['Environment'].update({'GANGA_LCG_CE': self.CE}) 845 jdl['Requirements'] = self.requirements.merge(jobconfig.requirements).convert() 846 847 if self.jobtype.upper() in ['NORMAL','MPICH']: 848 jdl['JobType'] = self.jobtype.upper() 849 if self.jobtype.upper() == 'MPICH': 850 #jdl['Requirements'].append('(other.GlueCEInfoTotalCPUs >= NodeNumber)') 851 jdl['Requirements'].append('Member("MPICH",other.GlueHostApplicationSoftwareRunTimeEnvironment)') 852 jdl['NodeNumber'] = self.requirements.nodenumber 853 else: 854 logger.warning('JobType "%s" not supported' % self.jobtype) 855 return 856 857 # additional settings from the job 858 # if jobconfig.env: 859 # jdl['Environment'].update(jobconfig.env) 860 861 jdlText = Grid.expandjdl(jdl) 862 logger.debug('subjob JDL: %s' % jdlText) 863 return inpw.writefile(FileBuffer('__jdlfile__',jdlText))
864
865 - def kill(self):
866 '''Kill the job''' 867 job = self.getJobObject() 868 869 logger.info('Killing job %s' % job.getFQID('.')) 870 871 if not self.id: 872 logger.warning('Job %s is not running.' % job.getFQID('.')) 873 return False 874 875 return grids['GLITE'].cream_cancelMultiple([self.id])
876
877 - def master_kill(self):
878 '''kill the master job to the grid''' 879 880 job = self.getJobObject() 881 882 if not job.master and len(job.subjobs) == 0: 883 return IBackend.master_kill(self) 884 elif job.master: 885 return IBackend.master_kill(self) 886 else: 887 return self.master_bulk_kill()
888
889 - def master_bulk_kill(self):
890 '''GLITE bulk resubmission''' 891 892 job = self.getJobObject() 893 894 ## killing the individually re-submitted subjobs 895 logger.debug('cancelling running/submitted subjobs.') 896 897 ## 1. collect job ids 898 ids = [] 899 for sj in job.subjobs: 900 if sj.status in ['submitted','running'] and sj.backend.id: 901 ids.append(sj.backend.id) 902 903 ## 2. cancel the collected jobs 904 ck = grids['GLITE'].cream_cancelMultiple(ids) 905 if not ck: 906 logger.warning('Job cancellation failed') 907 return False 908 else: 909 for sj in job.subjobs: 910 if sj.backend.id in ids: 911 sj.updateStatus('killed') 912 913 return True
914
915 - def master_bulk_submit(self, rjobs, subjobconfigs, masterjobconfig):
916 '''submit multiple subjobs in parallel, by default using 10 concurrent threads''' 917 918 assert(implies(rjobs,len(subjobconfigs)==len(rjobs))) 919 920 # prepare the subjobs, jdl repository before bulk submission 921 node_jdls = self.__mt_job_prepare__(rjobs, subjobconfigs, masterjobconfig) 922 923 if not node_jdls: 924 logger.error('Some jobs not successfully prepared') 925 return False 926 927 # set all subjobs to submitting status 928 for sj in rjobs: 929 sj.updateStatus('submitting') 930 931 node_jids = self.__mt_bulk_submit__(node_jdls) 932 933 status = False 934 935 if node_jids: 936 for sj in rjobs: 937 if sj.id in node_jids.keys(): 938 sj.backend.id = node_jids[sj.id] 939 sj.backend.CE = self.CE 940 sj.backend.actualCE = sj.backend.CE 941 sj.updateStatus('submitted') 942 sj.info.submit_counter += 1 943 else: 944 logger.warning('subjob %s not successfully submitted' % sj.getFQID('.')) 945 946 status = True 947 948 return status
949
950 - def master_bulk_resubmit(self,rjobs):
951 '''CREAM bulk resubmission''' 952 953 from Ganga.Core import IncompleteJobSubmissionError 954 from Ganga.Utility.logging import log_user_exception 955 956 # job = self.getJobObject() 957 958 # compose master JDL for collection job 959 node_jdls = {} 960 for sj in rjobs: 961 jdlpath = os.path.join(sj.inputdir,'__jdlfile__') 962 node_jdls[sj.id] = jdlpath 963 964 # set all subjobs to submitting status 965 for sj in rjobs: 966 sj.updateStatus('submitting') 967 968 node_jids = self.__mt_bulk_submit__(node_jdls) 969 970 status = False 971 972 if node_jids: 973 for sj in rjobs: 974 if sj.id in node_jids.keys(): 975 self.__refresh_jobinfo__(sj) 976 sj.backend.id = node_jids[sj.id] 977 sj.backend.CE = self.CE 978 sj.backend.actualCE = sj.backend.CE 979 sj.updateStatus('submitted') 980 sj.info.submit_counter += 1 981 else: 982 logger.warning('subjob %s not successfully submitted' % sj.getFQID('.')) 983 984 status = True 985 986 # # set all subjobs to submitted status 987 # # NOTE: this is just a workaround to avoid the unexpected transition 988 # # that turns the master job's status from 'submitted' to 'submitting'. 989 # # As this transition should be allowed to simulate a lock mechanism in Ganga 4, the workaround 990 # # is to set all subjobs' status to 'submitted' so that the transition can be avoided. 991 # # A more clear solution should be implemented with the lock mechanism introduced in Ganga 5. 992 # for sj in rjobs: 993 # sj.updateStatus('submitted') 994 # sj.info.submit_counter += 1 995 996 return status
997
998 - def master_submit(self,rjobs,subjobconfigs,masterjobconfig):
999 '''Submit the master job to the grid''' 1000 1001 profiler = ElapsedTimeProfiler(getLogger(name='Profile.LCG')) 1002 profiler.start() 1003 1004 job = self.getJobObject() 1005 1006 ## finding CREAM CE endpoint for job submission 1007 allowed_celist = [] 1008 try: 1009 allowed_celist = self.requirements.getce() 1010 if not self.CE and allowed_celist: 1011 self.CE = allowed_celist[0] 1012 except: 1013 logger.warning('CREAM CE assigment from AtlasCREAMRequirements failed.') 1014 1015 if self.CE and allowed_celist: 1016 if self.CE not in allowed_celist: 1017 logger.warning('submission to CE not allowed: %s, use %s instead' % ( self.CE, allowed_celist[0] ) ) 1018 self.CE = allowed_celist[0] 1019 1020 if not self.CE: 1021 raise GangaException('CREAM CE endpoint not set') 1022 1023 ## delegate proxy to CREAM CE 1024 if not grids['GLITE'].cream_proxy_delegation(self.CE): 1025 logger.warning('proxy delegation to %s failed' % self.CE) 1026 1027 ## doing massive job preparation 1028 if len(job.subjobs) == 0: 1029 ick = IBackend.master_submit(self,rjobs,subjobconfigs,masterjobconfig) 1030 else: 1031 ick = self.master_bulk_submit(rjobs,subjobconfigs,masterjobconfig) 1032 1033 profiler.check('==> master_submit() elapsed time') 1034 1035 return ick
1036
1037 - def submit(self,subjobconfig,master_job_sandbox):
1038 '''Submit the job to the grid''' 1039 1040 ick = False 1041 1042 jdlpath = self.preparejob(subjobconfig,master_job_sandbox) 1043 1044 if jdlpath: 1045 self.id = grids['GLITE'].cream_submit(jdlpath,self.CE) 1046 1047 if self.id: 1048 self.actualCE = self.CE 1049 ick = True 1050 1051 return ick
1052
1053 - def master_auto_resubmit(self,rjobs):
1054 """ 1055 Resubmit each subjob individually as bulk resubmission will overwrite 1056 previous master job statuses 1057 """ 1058 1059 # check for master failure - in which case bulk resubmit 1060 mj = self._getParent() 1061 if mj.status == 'failed': 1062 return self.master_resubmit(rjobs) 1063 1064 for j in rjobs: 1065 if not j.backend.master_resubmit([j]): 1066 return False 1067 1068 return True
1069
1070 - def master_resubmit(self,rjobs):
1071 '''Resubmit the master job to the grid''' 1072 1073 profiler = ElapsedTimeProfiler(getLogger(name='Profile.LCG')) 1074 profiler.start() 1075 1076 job = self.getJobObject() 1077 1078 ick = False 1079 1080 ## delegate proxy to CREAM CE 1081 if not grids['GLITE'].cream_proxy_delegation(self.CE): 1082 logger.warning('proxy delegation to %s failed' % self.CE) 1083 1084 if not job.master and len(job.subjobs) == 0: 1085 # case 1: master job normal resubmission 1086 logger.debug('rjobs: %s' % str(rjobs)) 1087 logger.debug('mode: master job normal resubmission') 1088 ick = IBackend.master_resubmit(self,rjobs) 1089 1090 elif job.master: 1091 # case 2: individual subjob resubmission 1092 logger.debug('mode: individual subjob resubmission') 1093 ick = IBackend.master_resubmit(self,rjobs) 1094 1095 else: 1096 # case 3: master job bulk resubmission 1097 logger.debug('mode: master job resubmission') 1098 1099 ick = self.master_bulk_resubmit(rjobs) 1100 if not ick: 1101 raise GangaException('CREAM bulk submission failure') 1102 1103 profiler.check('job re-submission elapsed time') 1104 1105 return ick
1106
1107 - def resubmit(self):
1108 '''Resubmit the job''' 1109 1110 ick = False 1111 1112 job = self.getJobObject() 1113 1114 jdlpath = job.getInputWorkspace().getPath("__jdlfile__") 1115 1116 if jdlpath: 1117 self.id = grids['GLITE'].cream_submit(jdlpath,self.CE) 1118 1119 if self.id: 1120 # refresh the lcg job information 1121 self.__refresh_jobinfo__(job) 1122 self.actualCE = self.CE 1123 ick = True 1124 1125 return ick
1126
1127 - def updateMonitoringInformation(jobs):
1128 '''Monitoring loop for normal jobs''' 1129 1130 jobdict = dict([ [job.backend.id,job] for job in jobs if job.backend.id ]) 1131 1132 jobInfoDict = grids['GLITE'].cream_status(jobdict.keys()) 1133 1134 jidListForPurge = [] 1135 1136 ## update job information for those available in jobInfoDict 1137 for id, info in jobInfoDict.items(): 1138 1139 if info: 1140 1141 job = jobdict[id] 1142 1143 if job.backend.status != info['Current Status']: 1144 1145 if info.has_key('Worker Node'): 1146 job.backend.workernode = info['Worker Node'] 1147 1148 if info.has_key('CREAM ISB URI'): 1149 job.backend.isbURI = info['CREAM ISB URI'] 1150 1151 if info.has_key('CREAM OSB URI'): 1152 job.backend.osbURI = info['CREAM OSB URI'] 1153 1154 doStatusUpdate = True 1155 1156 ## no need to update Ganga job status if backend status is not changed 1157 if info['Current Status'] == job.backend.status: 1158 doStatusUpdate = False 1159 1160 ## download output sandboxes if final status is reached 1161 elif info['Current Status'] in ['DONE-OK','DONE-FAILED']: 1162 1163 ## resolve output sandbox URIs based on the JDL information 1164 osbURIList = __cream_resolveOSBList__(job, info['JDL']) 1165 1166 logger.debug('OSB list:') 1167 for f in osbURIList: 1168 logger.debug(f) 1169 1170 if osbURIList: 1171 1172 if grids['GLITE'].cream_get_output( osbURIList, job.outputdir ): 1173 (ick, app_exitcode) = grids['GLITE'].__get_app_exitcode__(job.outputdir) 1174 job.backend.exitcode = app_exitcode 1175 1176 jidListForPurge.append( job.backend.id ) 1177 1178 else: 1179 logger.error('fail to download job output: %s' % jobdict[id].getFQID('.')) 1180 1181 if doStatusUpdate: 1182 job.backend.status = info['Current Status'] 1183 if info.has_key('ExitCode'): 1184 job.backend.exitcode_cream = int( info['ExitCode'] ) 1185 1186 if info.has_key('FailureReason'): 1187 job.backend.reason = info['FailureReason'] 1188 1189 job.backend.updateGangaJobStatus() 1190 else: 1191 logger.warning('fail to retrieve job informaton: %s' % jobdict[id].getFQID('.')) 1192 1193 ## purging the jobs the output has been fetched locally 1194 if jidListForPurge: 1195 grids['GLITE'].cream_purgeMultiple(jidListForPurge)
1196 1197 updateMonitoringInformation = staticmethod(updateMonitoringInformation) 1198
1199 - def updateGangaJobStatus(self):
1200 '''map backend job status to Ganga job status''' 1201 1202 job = self.getJobObject() 1203 1204 if self.status in ['RUNNING','REALLY-RUNNING']: 1205 job.updateStatus('running') 1206 1207 elif self.status == 'DONE-OK': 1208 if job.backend.exitcode and job.backend.exitcode != 0: 1209 job.backend.reason = 'non-zero app. exit code: %s' % repr(job.backend.exitcode) 1210 job.updateStatus('failed') 1211 elif job.backend.exitcode_cream and job.backend.exitcode_cream != 0: 1212 job.backend.reason = 'non-zero CREAM job exit code: %s' % repr(job.backend.exitcode_cream) 1213 job.updateStatus('failed') 1214 else: 1215 job.updateStatus('completed') 1216 1217 elif self.status in ['DONE-FAILED','ABORTED','UNKNOWN']: 1218 job.updateStatus('failed') 1219 1220 elif self.status in ['CANCELLED']: 1221 job.updateStatus('killed') 1222 1223 elif self.status in ['REGISTERED','PENDING','IDLE','HELD']: 1224 pass 1225 1226 else: 1227 logger.warning('Unexpected job status "%s"', self.status)
1228 1229 logger = getLogger() 1230 1231 config = getConfig('LCG') 1232 1233 ## add CREAM specific configuration options 1234 config.addOption('CreamInputSandboxBaseURI', '', 'sets the baseURI for getting the input sandboxes for the job') 1235 config.addOption('CreamOutputSandboxBaseURI', '', 'sets the baseURI for putting the output sandboxes for the job') 1236 #config.addOption('CreamPrologue','','sets the prologue script') 1237 #config.addOption('CreamEpilogue','','sets the epilogue script') 1238