Home | Trees | Indices | Help |
---|
|
1 ############################################################################### 2 # Ganga Project. http://cern.ch/ganga 3 # 4 # $Id: LCG.py,v 1.39 2009-07-16 10:39:27 hclee Exp $ 5 ############################################################################### 6 # 7 # LCG backend 8 # 9 # ATLAS/ARDA 10 # 11 # Date: August 2005 12 13 import os 14 import re 15 import time 16 import math 17 #import tempfile 18 from types import * 19 20 from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm 21 from Ganga.Core import GangaException 22 23 from Ganga.GPIDev.Schema import * 24 from Ganga.GPIDev.Lib.File import * 25 from Ganga.GPIDev.Adapters.IBackend import IBackend 26 from Ganga.GPIDev.Adapters.StandardJobConfig import StandardJobConfig 27 from Ganga.Utility.Config import makeConfig, getConfig 28 from Ganga.Utility.logging import getLogger, log_user_exception 29 from Ganga.Utility.util import isStringLike 30 from Ganga.Lib.LCG.ElapsedTimeProfiler import ElapsedTimeProfiler 31 from Ganga.Lib.LCG.LCGOutputDownloader import LCGOutputDownloader 32 from Ganga.Lib.LCG.Utility import * 33 34 # for runtime stdout/stderr inspection 35 from Ganga.Lib.MonitoringServices.Octopus import Octopus,ProtocolException 36 37 try: 38 simulator_enabled = os.environ['GANGA_GRID_SIMULATOR'] 39 except KeyError: 40 simulator_enabled = False 41 42 if simulator_enabled: 43 from GridSimulator import GridSimulator as Grid 44 else: 45 from Grid import Grid 4648 global lcg_output_downloader 49 lcg_output_downloader = None 50 if not lcg_output_downloader: 51 lcg_output_downloader = LCGOutputDownloader(numThread=10) 52 lcg_output_downloader.start()53 57 58 start_lcg_output_downloader() 59 60 ## helper routines62 '''failing the Ganga jobs if the associated glite job id is appearing in missing_glite_jids''' 63 64 for glite_jid in missing_glite_jids: 65 if jobdict.has_key( glite_jid ): 66 j = jobdict[glite_jid] 67 68 if j.master: 69 ## this is a subjob 70 j.backend.status = 'Removed' 71 j.backend.reason = 'job removed from WMS' 72 j.updateStatus('failed') 73 74 else: 75 ## this is a master job 76 for sj in j.subjobs: 77 if sj.backend.parent_id == glite_jid: 78 sj.backend.status = 'Removed' 79 sj.backend.reason = 'job removed from WMS' 80 sj.updateStatus('failed') 81 82 j.updateStatus('failed')8385 '''LCG backend - submit jobs to the EGEE/LCG Grid using gLite/EDG middleware. 86 87 The middleware type (EDG/gLite) may be selected with the middleware 88 attribute. The specific middleware type must be enabled in ganga 89 configuration. See [LCG] section of ~/.gangarc file. 90 91 If the input sandbox exceeds the limit specified in the ganga 92 configuration, it is automatically uploaded to a storage element. This 93 overcomes sandbox size limits on the resource broker. 94 95 For gLite middleware bulk (faster) submission is supported so splitting 96 jobs may be more efficient than submitting bunches of individual jobs. 97 98 For more options see help on LCGRequirements. 99 100 See also: http://cern.ch/glite/documentation 101 ''' 102 103 # internal usage of the flag: 104 # - 0: job without the need of special control 105 # - 1: job (normally a subjob) resubmitted individually. The monitoring of those jobs should be separated. 106 _schema = Schema(Version(1,9), { 107 'CE' : SimpleItem(defvalue='',doc='Request a specific Computing Element'), 108 'jobtype' : SimpleItem(defvalue='Normal',doc='Job type: Normal, MPICH'), 109 'requirements' : ComponentItem('LCGRequirements',doc='Requirements for the resource selection'), 110 'sandboxcache' : ComponentItem('GridSandboxCache',copyable=1,doc='Interface for handling oversized input sandbox'), 111 'parent_id' : SimpleItem(defvalue='',protected=1,copyable=0,hidden=1,doc='Middleware job identifier for its parent job'), 112 'id' : SimpleItem(defvalue='',typelist=['str','list'],protected=1,copyable=0,doc='Middleware job identifier'), 113 'status' : SimpleItem(defvalue='',typelist=['str','dict'], protected=1,copyable=0,doc='Middleware job status'), 114 'middleware' : SimpleItem(defvalue='GLITE',protected=0,copyable=1,doc='Middleware type',checkset='__checkset_middleware__'), 115 'exitcode' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Application exit code'), 116 'exitcode_lcg' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Middleware exit code'), 117 'reason' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Reason of causing the job status'), 118 'perusable' : SimpleItem(defvalue=False,protected=0,copyable=1,checkset='__checkset_perusable__',doc='Enable the job perusal feature of GLITE'), 119 'actualCE' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Computing Element where the job actually runs.'), 120 'monInfo' : SimpleItem(defvalue={},protected=1,copyable=0,hidden=1,doc='Hidden information of the monitoring service.'), 121 'octopus' : SimpleItem(defvalue=None,typelist=['type(None)', 'Ganga.Lib.MonitoringServices.Octopus.Octopus'],protected=1,copyable=0,transient=1,hidden=1,doc='Hidden transient object for Octopus connection.'), 122 'flag' : SimpleItem(defvalue=0,protected=1,copyable=0,hidden=1,doc='Hidden flag for internal control.') 123 }) 124 125 _category = 'backends' 126 _name = 'LCG' 127 _exportmethods = ['check_proxy', 'loginfo', 'inspect', 'match'] 128 129 _GUIPrefs = [ { 'attribute' : 'CE', 'widget' : 'String' }, 130 { 'attribute' : 'jobtype', 'widget' : 'String_Choice', 'choices' : ['Normal', 'MPICH'] }, 131 { 'attribute' : 'middleware', 'widget' : 'String_Choice', 'choices' : [ 'EDG', 'GLITE' ] } ] 132 133 _final_ganga_states = ['completing','completed','failed'] 134472 473 def __make_collection_jdl__(self,nodeJDLFiles=[], offset=0): 474 '''Compose the collection JDL for the master job''' 475 476 nodes = ',\n'.join(map(lambda x:'[file = "%s";]' % x, nodeJDLFiles)) 477 478 jdl = { 479 'Type' : 'collection', 480 'VirtualOrganisation' : config['VirtualOrganisation'], 481 'Nodes' : '' 482 } 483 484 # specification of the node jobs 485 node_cnt = offset 486 node_str = '' 487 jdl ['Nodes'] = '{\n'; 488 for f in nodeJDLFiles: 489 node_str += '[NodeName = "gsj_%d"; file="%s";],\n' % (node_cnt, f) 490 node_cnt += 1 491 if node_str: 492 jdl['Nodes'] += node_str.strip()[:-1] 493 jdl['Nodes'] += '\n}'; 494 495 jdlText = Grid.expandjdl(jdl) 496 logger.debug('master job JDL: %s' % jdlText) 497 return jdlText 498 499 # split to multiple glite bulk jobs 500 num_chunks = len(node_jdls) / max_node 501 if len(node_jdls) % max_node > 0: 502 num_chunks += 1 503 504 mt_data = [] 505 506 for i in range(num_chunks): 507 data = {} 508 ibeg = i * max_node 509 iend = min(ibeg + max_node, len(node_jdls) ) 510 data['offset'] = ibeg 511 data['jdls'] = node_jdls[ibeg:iend] 512 mt_data.append(data) 513 514 myAlg = MyAlgorithm(gridObj=grids[mt],masterInputWorkspace=job.getInputWorkspace()) 515 myData = Data(collection=mt_data) 516 517 runner = MTRunner(name='lcg_jsubmit', algorithm=myAlg, data=myData, numThread=config['SubmissionThread']) 518 runner.start() 519 runner.join(timeout=-1) 520 521 if len(runner.getDoneList()) < num_chunks: 522 ## not all bulk jobs are successfully submitted. canceling the submitted jobs on WMS immediately 523 logger.error('some bulk jobs not successfully (re)submitted, canceling submitted jobs on WMS') 524 grids[mt].cancelMultiple( runner.getResults().values() ) 525 return None 526 else: 527 return runner.getResults() 528136 super(LCG,self).__init__() 137 if not self.middleware: 138 self.middleware = 'EDG' 139 140 # Disable GLITE perusal by default, since it can be dangerous 141 self.perusable=False 142 143 # dynamic requirement object loading 144 try: 145 reqName1 = config['Requirements'] 146 reqName = config['Requirements'].split('.').pop() 147 reqModule = __import__(reqName1, globals(), locals(), [reqName1]) 148 reqClass = vars(reqModule)[reqName] 149 self.requirements = reqClass() 150 151 logger.debug('load %s as LCGRequirements' % reqName) 152 except: 153 logger.debug('load default LCGRequirements') 154 pass 155 156 # dynamic sandbox cache object loading 157 try: 158 scName1 = config['SandboxCache'] 159 scName = config['SandboxCache'].split('.').pop() 160 scModule = __import__(scName1, globals(), locals(), [scName1]) 161 scClass = vars(scModule)[scName] 162 self.sandboxcache = scClass() 163 logger.debug('load %s as SandboxCache' % scName) 164 except: 165 logger.debug('load default LCGSandboxCAche') 166 pass167169 if value and not value.upper() in ['GLITE','EDG']: 170 raise AttributeError('middleware value must be either \'GLITE\' or \'EDG\'')171173 if value!=False and self.middleware.upper()!='GLITE': 174 raise AttributeError("perusable can only be set for GLITE jobs")175 176178 '''Sets up the sandbox cache object to adopt the runtime configuration of the LCG backend''' 179 180 re_token = re.compile('^token:(.*):(.*)$') 181 182 self.sandboxcache.vo = config['VirtualOrganisation'] 183 self.sandboxcache.middleware = self.middleware.upper() 184 self.sandboxcache.timeout = config['SandboxTransferTimeout'] 185 186 if self.sandboxcache._name == 'LCGSandboxCache': 187 if not self.sandboxcache.lfc_host: 188 self.sandboxcache.lfc_host = grids[self.middleware.upper()].__get_lfc_host__() 189 190 if not self.sandboxcache.se: 191 192 token = '' 193 se_host = config['DefaultSE'] 194 m = re_token.match(se_host) 195 if m: 196 token = m.group(1) 197 se_host = m.group(2) 198 199 self.sandboxcache.se = se_host 200 201 if token: 202 self.sandboxcache.srm_token = token 203 204 if (self.sandboxcache.se_type in ['srmv2']) and (not self.sandboxcache.srm_token): 205 self.sandboxcache.srm_token = config['DefaultSRMToken'] 206 207 elif self.sandboxcache._name == 'DQ2SandboxCache': 208 209 ## generate a new dataset name if not given 210 if not self.sandboxcache.dataset_name: 211 from GangaAtlas.Lib.ATLASDataset.DQ2Dataset import dq2outputdatasetname 212 self.sandboxcache.dataset_name,unused = dq2outputdatasetname("%s.input"%get_uuid(), 0, False, '') 213 214 ## subjobs inherits the dataset name from the master job 215 for sj in job.subjobs: 216 sj.backend.sandboxcache.dataset_name = self.sandboxcache.dataset_name 217 218 return True219221 '''Checks the given input file size and if it's size is 222 over "BoundSandboxLimit", prestage it to a grid SE. 223 224 The argument is a path of the local file. 225 226 It returns a dictionary containing information to refer to the file: 227 228 idx = {'lfc_host': lfc_host, 229 'local': [the local file pathes], 230 'remote': {'fname1': 'remote index1', 'fname2': 'remote index2', ... } 231 } 232 233 If prestaging failed, None object is returned. 234 235 If the file has been previously uploaded (according to md5sum), 236 the prestaging is ignored and index to the previously uploaded file 237 is returned. 238 ''' 239 240 idx = {'lfc_host':'', 'local':[], 'remote':{}} 241 242 job = self.getJobObject() 243 244 ## read-in the previously uploaded files 245 uploadedFiles = [] 246 247 ## getting the uploaded file list from the master job 248 if job.master: 249 uploadedFiles += job.master.backend.sandboxcache.get_cached_files() 250 251 ## set and get the $LFC_HOST for uploading oversized sandbox 252 self.__setup_sandboxcache__(job) 253 254 uploadedFiles += self.sandboxcache.get_cached_files() 255 256 lfc_host = None 257 258 ## for LCGSandboxCache, take the one specified in the sansboxcache object. 259 ## the value is exactly the same as the one from the local grid shell env. if 260 ## it is not specified exclusively. 261 if self.sandboxcache._name == 'LCGSandboxCache': 262 lfc_host = self.sandboxcache.lfc_host 263 264 ## or in general, query it from the Grid object 265 if not lfc_host: 266 lfc_host = grids[self.sandboxcache.middleware.upper()].__get_lfc_host__() 267 268 idx['lfc_host'] = lfc_host 269 270 abspath = os.path.abspath(file) 271 fsize = os.path.getsize(abspath) 272 if fsize > config['BoundSandboxLimit']: 273 274 md5sum = get_md5sum(abspath, ignoreGzipTimestamp=True) 275 276 doUpload = True 277 for uf in uploadedFiles: 278 if uf.md5sum == md5sum: 279 # the same file has been uploaded to the iocache 280 idx['remote'][os.path.basename(file)] = uf.id 281 doUpload = False 282 break 283 284 if doUpload: 285 286 logger.warning('The size of %s is larger than the sandbox limit (%d byte). Please wait while pre-staging ...' % (file,config['BoundSandboxLimit']) ) 287 288 if self.sandboxcache.upload( [abspath] ): 289 remote_sandbox = self.sandboxcache.get_cached_files()[-1] 290 idx['remote'][remote_sandbox.name] = remote_sandbox.id 291 else: 292 logger.error('Oversized sandbox not successfully pre-staged') 293 return None 294 else: 295 idx['local'].append(abspath) 296 297 return idx298300 '''Refresh the lcg jobinfo. It will be called after resubmission.''' 301 job.backend.status = '' 302 job.backend.reason = '' 303 job.backend.actualCE = '' 304 job.backend.exitcode = '' 305 job.backend.exitcode_lcg = '' 306 job.backend.flag = 0307309 '''Prints out the error message when no matched resource''' 310 311 logger.error('No matched resource: check/report the JDL below') 312 313 f = open( jdl, 'r') 314 lines = f.readlines() 315 f.close() 316 317 logger.error('=== JDL ===\n' + '\n'.join( map(lambda l:l.strip(), lines) ) ) 318 319 return320322 """ 323 Resubmit each subjob individually as bulk resubmission will overwrite 324 previous master job statuses 325 """ 326 327 # check for master failure - in which case bulk resubmit 328 mj = self._getParent() 329 if mj.status == 'failed': 330 return self.master_resubmit(rjobs) 331 332 for j in rjobs: 333 if not j.backend.master_resubmit([j]): 334 return False 335 336 return True337339 '''Submit the master job to the grid''' 340 341 profiler = ElapsedTimeProfiler(getLogger(name='Profile.LCG')) 342 profiler.start() 343 344 # if config['DrySubmit']: 345 # logger.warning('No job will be submitted in DrySubmit mode') 346 347 mt = self.middleware.upper() 348 349 job = self.getJobObject() 350 351 ick = False 352 if not config['%s_ENABLE' % mt]: 353 #logger.warning('Operations of %s middleware are disabled.' % mt) 354 #ick = False 355 raise GangaException('Operations of %s middleware not enabled' % mt) 356 else: 357 if mt == 'EDG' or len(job.subjobs) == 0: 358 ick = IBackend.master_submit(self,rjobs,subjobconfigs,masterjobconfig) 359 else: 360 ick = self.master_bulk_submit(rjobs,subjobconfigs,masterjobconfig) 361 if not ick: 362 raise GangaException('GLITE bulk submission failure') 363 364 profiler.check('==> master_submit() elapsed time') 365 366 # if config['DrySubmit']: 367 # ick = False 368 369 return ick370372 '''Resubmit the master job to the grid''' 373 374 profiler = ElapsedTimeProfiler(getLogger(name='Profile.LCG')) 375 profiler.start() 376 377 # if config['DrySubmit']: 378 # logger.warning('No job will be submitted in DrySubmit mode') 379 380 mt = self.middleware.upper() 381 382 job = self.getJobObject() 383 384 ick = False 385 if not config['%s_ENABLE' % mt]: 386 #logger.warning('Operations of %s middleware are disabled.' % mt) 387 #ick = False 388 raise GangaException('Operations of %s middleware not enabled' % mt) 389 else: 390 if mt == 'EDG': 391 ick = IBackend.master_resubmit(self,rjobs) 392 393 if mt == 'GLITE': 394 if not job.master and len(job.subjobs) == 0: 395 # case 1: master job normal resubmission 396 logger.debug('rjobs: %s' % str(rjobs)) 397 logger.debug('mode: master job normal resubmission') 398 ick = IBackend.master_resubmit(self,rjobs) 399 400 elif job.master: 401 # case 2: individual subjob resubmission 402 logger.debug('mode: individual subjob resubmission') 403 status = IBackend.master_resubmit(self,rjobs) 404 if status: 405 # set the backend flag to 1 if the job is individually submitted 406 # the monitoring loop on the master job shouldn't taken into account this job 407 job.backend.flag = 1 408 ick = status 409 410 else: 411 # case 3: master job bulk resubmission 412 logger.debug('mode: master job bulk resubmission') 413 ick = self.master_bulk_resubmit(rjobs) 414 if not ick: 415 raise GangaException('GLITE bulk submission failure') 416 417 profiler.check('job re-submission elapsed time') 418 419 # if config['DrySubmit']: 420 # ick = False 421 422 return ick423425 '''kill the master job to the grid''' 426 mt = self.middleware.upper() 427 428 job = self.getJobObject() 429 430 if mt == 'EDG': 431 return IBackend.master_kill(self) 432 433 if mt == 'GLITE': 434 if not job.master and len(job.subjobs) == 0: 435 return IBackend.master_kill(self) 436 elif job.master: 437 #logger.warning('Killing individual subjob in GLITE middleware is an experimental function.') 438 return IBackend.master_kill(self) 439 else: 440 return self.master_bulk_kill()441443 '''submitting bulk jobs in multiple threads''' 444 445 job = self.getJobObject() 446 mt = self.middleware.upper() 447 448 logger.info('submitting %d subjobs ... it may take a while' % len(node_jdls)) 449 450 # the algorithm for submitting a single bulk job 451 class MyAlgorithm(Algorithm): 452 453 def __init__(self, gridObj, masterInputWorkspace): 454 Algorithm.__init__(self) 455 self.inpw = masterInputWorkspace 456 self.gridObj = gridObj457 458 def process(self, node_info): 459 my_node_offset = node_info['offset'] 460 my_node_jdls = node_info['jdls'] 461 coll_jdl_name = '__jdlfile__%d_%d__' % (my_node_offset, my_node_offset + len(my_node_jdls)) 462 # compose master JDL for collection job 463 jdl_cnt = self.__make_collection_jdl__(my_node_jdls, offset=my_node_offset) 464 jdl_path = self.inpw.writefile( FileBuffer(coll_jdl_name, jdl_cnt) ) 465 466 master_jid = self.gridObj.submit(jdl_path,ce=None) 467 if not master_jid: 468 return False 469 else: 470 self.__appendResult__( my_node_offset, master_jid ) 471 return True530 '''preparing jobs in multiple threads''' 531 532 logger.info('preparing %d subjobs ... it may take a while' % len(rjobs)) 533 534 mt = self.middleware.upper() 535 536 job = self.getJobObject() 537 538 # prepare the master job (i.e. create shared inputsandbox, etc.) 539 master_input_sandbox=IBackend.master_prepare(self,masterjobconfig) 540 541 ## uploading the master job if it's over the WMS sandbox limitation 542 for f in master_input_sandbox: 543 master_input_idx = self.__check_and_prestage_inputfile__(f) 544 545 if not master_input_idx: 546 logger.error('master input sandbox perparation failed: %s' % f) 547 return None 548 549 # the algorithm for preparing a single bulk job 550 class MyAlgorithm(Algorithm): 551 552 def __init__(self): 553 Algorithm.__init__(self)554 555 def process(self, sj_info): 556 my_sc = sj_info[0] 557 my_sj = sj_info[1] 558 559 try: 560 logger.debug("preparing job %s" % my_sj.getFQID('.')) 561 jdlpath = my_sj.backend.preparejob(my_sc, master_input_sandbox) 562 563 if (not jdlpath) or (not os.path.exists(jdlpath)): 564 raise GangaException('job %s not properly prepared' % my_sj.getFQID('.')) 565 566 self.__appendResult__( my_sj.id, jdlpath ) 567 return True 568 except Exception,x: 569 log_user_exception() 570 return False 571 572 573 mt_data = [] 574 for sc,sj in zip(subjobconfigs,rjobs): 575 mt_data.append( [sc, sj] ) 576 577 myAlg = MyAlgorithm() 578 myData = Data(collection=mt_data) 579 580 runner = MTRunner(name='lcg_jprepare', algorithm=myAlg, data=myData, numThread=10) 581 runner.start() 582 runner.join(-1) 583 584 if len(runner.getDoneList()) < len(mt_data): 585 return None 586 else: 587 # the result should be sorted 588 results = runner.getResults() 589 sc_ids = results.keys() 590 sc_ids.sort() 591 592 node_jdls = [] 593 for id in sc_ids: 594 node_jdls.append(results[id]) 595 return node_jdls 596598 '''GLITE bulk submission''' 599 600 from Ganga.Core import IncompleteJobSubmissionError 601 from Ganga.Utility.logging import log_user_exception 602 603 profiler = ElapsedTimeProfiler(getLogger(name='Profile.LCG')) 604 profiler.start() 605 606 assert(implies(rjobs,len(subjobconfigs)==len(rjobs))) 607 608 # prepare the subjobs, jdl repository before bulk submission 609 node_jdls = self.__mt_job_prepare__(rjobs, subjobconfigs, masterjobconfig) 610 611 if not node_jdls: 612 logger.error('Some jobs not successfully prepared') 613 return False 614 615 profiler.checkAndStart('job preparation elapsed time') 616 617 if config['MatchBeforeSubmit']: 618 mt = self.middleware.upper() 619 matches = grids[mt].list_match(node_jdls[-1], ce=self.CE) 620 if not matches: 621 self.__print_no_resource_error__(node_jdls[-1]) 622 return False 623 624 profiler.checkAndStart('job list-match elapsed time') 625 626 # set all subjobs to submitting status 627 for sj in rjobs: 628 sj.updateStatus('submitting') 629 630 profiler.checkAndStart('job state transition (submitting) elapsed time') 631 632 max_node = config['GliteBulkJobSize'] 633 results = self.__mt_bulk_submit__(node_jdls, max_node=max_node) 634 635 profiler.checkAndStart('job submission elapsed time') 636 637 status = False 638 if results: 639 offsets = results.keys() 640 offsets.sort() 641 642 self.id = [] 643 self.status = {} 644 for ibeg in offsets: 645 mid = results[ibeg] 646 self.id.append(mid) 647 self.status[mid] = '' 648 iend = min(ibeg + max_node, len(node_jdls)) 649 for i in range(ibeg, iend): 650 sj = rjobs[i] 651 sj.backend.parent_id = mid 652 sj.updateStatus('submitted') 653 sj.info.submit_counter += 1 654 655 status = True 656 657 return status658660 '''GLITE bulk resubmission''' 661 662 from Ganga.Core import IncompleteJobSubmissionError 663 from Ganga.Utility.logging import log_user_exception 664 665 job = self.getJobObject() 666 667 # compose master JDL for collection job 668 node_jdls = [] 669 for sj in rjobs: 670 jdlpath = os.path.join(sj.inputdir,'__jdlfile__') 671 node_jdls.append(jdlpath) 672 673 if config['MatchBeforeSubmit']: 674 mt = self.middleware.upper() 675 matches = grids[mt].list_match(node_jdls[-1], ce=self.CE) 676 if not matches: 677 self.__print_no_resource_error__(node_jdls[-1]) 678 return False 679 680 max_node = config['GliteBulkJobSize'] 681 682 results = self.__mt_bulk_submit__(node_jdls, max_node=max_node) 683 684 status = False 685 if results: 686 offsets = results.keys() 687 offsets.sort() 688 689 self.__refresh_jobinfo__(job) 690 self.id = [] 691 self.status = {} 692 for ibeg in offsets: 693 mid = results[ibeg] 694 self.id.append(mid) 695 self.status[mid] = '' 696 iend = min(ibeg + max_node, len(node_jdls)) 697 for i in range(ibeg, iend): 698 sj = rjobs[i] 699 sj.backend.id = None 700 sj.backend.parent_id = mid 701 self.__refresh_jobinfo__(sj) 702 sj.updateStatus('submitting') 703 704 # set all subjobs to submitted status 705 # NOTE: this is just a workaround to avoid the unexpected transition 706 # that turns the master job's status from 'submitted' to 'submitting'. 707 # As this transition should be allowed to simulate a lock mechanism in Ganga 4, the workaround 708 # is to set all subjobs' status to 'submitted' so that the transition can be avoided. 709 # A more clear solution should be implemented with the lock mechanism introduced in Ganga 5. 710 for sj in rjobs: 711 sj.updateStatus('submitted') 712 sj.info.submit_counter += 1 713 714 status = True 715 716 return status717719 '''GLITE bulk resubmission''' 720 721 job = self.getJobObject() 722 mt = self.middleware.upper() 723 724 ## killing the individually re-submitted subjobs 725 logger.debug('cancelling individually resubmitted subjobs.') 726 727 ## 1. collect job ids 728 ids = [] 729 for sj in job.subjobs: 730 if sj.backend.flag == 1 and sj.status in ['submitted','running']: 731 ids.append(sj.backend.id) 732 733 ## 2. cancel the collected jobs 734 ck = grids[mt].cancelMultiple(ids) 735 if not ck: 736 logger.warning('Job cancellation failed') 737 return False 738 else: 739 for sj in job.subjobs: 740 if sj.backend.flag == 1 and sj.status in ['submitted','running']: 741 sj.updateStatus('killed') 742 743 ## killing the master job 744 logger.debug('cancelling the master job.') 745 746 ## avoid killing master jobs in the final state 747 final_states = ['Aborted','Cancelled','Cleared','Done (Success)','Done (Failed)','Done (Exit Code !=0)'] 748 myids = [] 749 if isStringLike(self.id): 750 if job.backend.status not in final_states: 751 myids.append(self.id) 752 else: 753 for myid in self.id: 754 try: 755 if job.backend.status[myid] not in final_states: 756 myids.append(myid) 757 except KeyError: 758 pass 759 760 ck = grids[mt].native_master_cancel(myids) 761 762 if not ck: 763 logger.warning('Job cancellation failed: %s' % self.id) 764 return False 765 else: 766 for sj in job.subjobs: 767 if sj.backend.flag != 1 and sj.status in ['submitted','running']: 768 sj.updateStatus('killed') 769 return True770772 """Get the job's logging info""" 773 774 job = self.getJobObject() 775 776 logger.debug('Getting logging info of job %s' % job.getFQID('.')) 777 778 mt = self.middleware.upper() 779 780 if not config['%s_ENABLE' % mt]: 781 logger.warning('Operations of %s middleware are disabled.' % mt) 782 return None 783 784 if not self.id: 785 logger.warning('Job %s is not running.' % job.getFQID('.')) 786 return None 787 788 if isStringLike(self.id): 789 my_ids = [ self.id ] 790 else: 791 my_ids = self.id 792 793 # successful logging info fetching returns a file path to the information 794 loginfo_output = grids[self.middleware.upper()].get_loginfo(my_ids,job.outputdir,verbosity) 795 796 if loginfo_output: 797 798 # returns the name of the file where the logging info is saved 799 return loginfo_output 800 801 #f = open(loginfo_output,'r') 802 #info = map(lambda x:x.strip(),f.readlines()) 803 #f.close() 804 # returns the logging info as an array of strings 805 #return info 806 else: 807 logger.debug('Getting logging info of job %s failed.' % job.getFQID('.')) 808 return None809811 '''Match the job against available grid resources''' 812 813 ## - grabe the existing __jdlfile__ for failed/completed jobs 814 ## - simulate the job preparation procedure (for jobs never been submitted) 815 ## - subjobs from job splitter are not created (as its not essential for match-making) 816 ## - create a temporary JDL file for match making 817 ## - call job list match 818 ## - clean up the job's inputdir 819 820 job = self.getJobObject() 821 822 ## check job status 823 if job.status not in ['new','submitted','failed','completed']: 824 msg = 'only jobs in \'new\', \'failed\', \'submitted\' or \'completed\' state can do match' 825 logger.warning(msg) 826 return 827 828 from Ganga.Core import ApplicationConfigurationError, JobManagerError, IncompleteJobSubmissionError 829 830 doPrepareEmulation = False 831 832 matches = [] 833 834 mt = self.middleware.upper() 835 836 ## catch the files that are already in inputdir 837 existing_files = os.listdir(job.inputdir) 838 839 app = job.application 840 841 # select the runtime handler 842 from Ganga.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers 843 try: 844 rtHandler = allHandlers.get(app._name,'LCG')() 845 except KeyError: 846 msg = 'runtime handler not found for application=%s and backend=%s'%(app._name,'LCG') 847 logger.warning(msg) 848 return 849 850 try: 851 logger.info('matching job %d' % job.id) 852 853 jdlpath = '' 854 855 ## try to pick up the created jdlfile in a failed job 856 if job.status in ['submitted','failed','completed']: 857 858 logger.debug('picking up existing JDL') 859 860 ## looking for existing jdl file 861 if job.master: ## this is a subjob, take the __jdlfile__ in the job's dir 862 jdlpath = os.path.join(job.inputdir,'__jdlfile__') 863 else: 864 if len(job.subjobs) > 0: ## there are subjobs 865 jdlpath = os.path.join(job.subjobs[0].inputdir, '__jdlfile__') 866 else: 867 jdlpath = os.path.join(job.inputdir,'__jdlfile__') 868 869 if not os.path.exists( jdlpath ): 870 jdlpath = '' 871 872 ## simulate the job preparation procedure 873 if not jdlpath: 874 875 logger.debug('emulating the job preparation procedure to create JDL') 876 877 doPrepareEmulation = True 878 879 appmasterconfig = app.master_configure()[1] # FIXME: obsoleted "modified" flag 880 881 ## here we don't do job splitting - presuming the JDL for non-splitted job is the same as the splitted jobs 882 rjobs = [job] 883 884 # configure the application of each subjob 885 appsubconfig = [ j.application.configure(appmasterconfig)[1] for j in rjobs ] #FIXME: obsoleted "modified" flag 886 887 # prepare the master job with the runtime handler 888 jobmasterconfig = rtHandler.master_prepare(app,appmasterconfig) 889 890 # prepare the subjobs with the runtime handler 891 jobsubconfig = [ rtHandler.prepare(j.application,s,appmasterconfig,jobmasterconfig) for (j,s) in zip(rjobs,appsubconfig) ] 892 893 # prepare masterjob's inputsandbox 894 master_input_sandbox = self.master_prepare(jobmasterconfig) 895 896 # prepare JDL 897 jdlpath = self.preparejob(jobsubconfig[0], master_input_sandbox) 898 899 logger.debug('JDL used for match-making: %s' % jdlpath) 900 901 # If GLITE, tell it whether to enable perusal 902 if mt=="GLITE": 903 grids[mt].perusable=self.perusable 904 905 matches = grids[mt].list_match(jdlpath, ce=self.CE) 906 907 except Exception, x: 908 logger.warning('job match failed: %s', str(x) ) 909 910 ## clean up the job's inputdir 911 if doPrepareEmulation: 912 logger.debug('clean up job inputdir') 913 files = os.listdir(job.inputdir) 914 for f in files: 915 if f not in existing_files: 916 os.remove( os.path.join(job.inputdir, f) ) 917 918 return matches919921 '''Submit the job to the grid''' 922 923 mt = self.middleware.upper() 924 925 if not config['%s_ENABLE' % mt]: 926 logger.warning('Operations of %s middleware are disabled.' % mt) 927 return None 928 929 jdlpath = self.preparejob(subjobconfig,master_job_sandbox) 930 # If GLITE, tell it whether to enable perusal 931 if mt=="GLITE": 932 grids[mt].perusable=self.perusable 933 934 if config['MatchBeforeSubmit']: 935 matches = grids[mt].list_match(jdlpath, ce=self.CE) 936 if not matches: 937 self.__print_no_resource_error__(jdlpath) 938 return None 939 940 self.id = grids[mt].submit(jdlpath, ce=self.CE) 941 942 self.parent_id = self.id 943 944 return not self.id is None945947 '''Resubmit the job''' 948 job = self.getJobObject() 949 950 mt = self.middleware.upper() 951 952 jdlpath = job.getInputWorkspace().getPath("__jdlfile__") 953 954 if config['MatchBeforeSubmit']: 955 matches = grids[mt].list_match(jdlpath, ce=self.CE) 956 if not matches: 957 self.__print_no_resource_error__(jdlpath) 958 return None 959 960 self.id = grids[mt].submit(jdlpath, ce=self.CE) 961 self.parent_id = self.id 962 963 if self.id: 964 # refresh the lcg job information 965 self.__refresh_jobinfo__(job) 966 967 return not self.id is None968970 '''Kill the job''' 971 972 job = self.getJobObject() 973 974 logger.info('Killing job %s' % job.getFQID('.')) 975 976 mt = self.middleware.upper() 977 978 if not config['%s_ENABLE' % mt]: 979 logger.warning('Operations of %s middleware are disabled.' % mt) 980 return False 981 982 if not self.id: 983 logger.warning('Job %s is not running.' % job.getFQID('.')) 984 return False 985 986 return grids[self.middleware.upper()].cancel(self.id)987989 '''Create job wrapper''' 990 991 script = """#!/usr/bin/env python 992 #----------------------------------------------------- 993 # This job wrapper script is automatically created by 994 # GANGA LCG backend handler. 995 # 996 # It controls: 997 # 1. unpack input sandbox 998 # 2. invoke application executable 999 # 3. invoke monitoring client 1000 #----------------------------------------------------- 1001 import os,os.path,shutil,tempfile 1002 import sys,popen2,time,traceback 1003 1004 #bugfix #36178: subprocess.py crashes if python 2.5 is used 1005 #try to import subprocess from local python installation before an 1006 #import from PYTHON_DIR is attempted some time later 1007 try: 1008 import subprocess 1009 except ImportError: 1010 pass 1011 1012 ## Utility functions ## 1013 def timeString(): 1014 return time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) 1015 1016 def printInfo(s): 1017 out.write(timeString() + ' [Info]' + ' ' + str(s) + os.linesep) 1018 out.flush() 1019 1020 def printError(s): 1021 out.write(timeString() + ' [Error]' + ' ' + str(s) + os.linesep) 1022 out.flush() 1023 1024 def lcg_file_download(vo,guid,localFilePath,timeout=60,maxRetry=3): 1025 cmd = 'lcg-cp -t %d --vo %s %s file://%s' % (timeout,vo,guid,localFilePath) 1026 1027 printInfo('LFC_HOST set to %s' % os.environ['LFC_HOST']) 1028 printInfo('lcg-cp timeout: %d' % timeout) 1029 1030 i = 0 1031 rc = 0 1032 isDone = False 1033 try_again = True 1034 1035 while try_again: 1036 i = i + 1 1037 try: 1038 ps = os.popen(cmd) 1039 status = ps.close() 1040 1041 if not status: 1042 isDone = True 1043 printInfo('File %s download from iocache' % os.path.basename(localFilePath)) 1044 else: 1045 raise IOError("Download file %s from iocache failed with error code: %d, trial %d." % (os.path.basename(localFilePath), status, i)) 1046 1047 except IOError, e: 1048 isDone = False 1049 printError(str(e)) 1050 1051 if isDone: 1052 try_again = False 1053 elif i == maxRetry: 1054 try_again = False 1055 else: 1056 try_again = True 1057 1058 return isDone 1059 1060 ## system command executor with subprocess 1061 def execSyscmdSubprocess(cmd, wdir=os.getcwd()): 1062 1063 import os, subprocess 1064 1065 global exitcode 1066 1067 outfile = file('stdout','w') 1068 errorfile = file('stderr','w') 1069 1070 try: 1071 child = subprocess.Popen(cmd, cwd=wdir, shell=True, stdout=outfile, stderr=errorfile) 1072 1073 while 1: 1074 exitcode = child.poll() 1075 if exitcode is not None: 1076 break 1077 else: 1078 outfile.flush() 1079 errorfile.flush() 1080 monitor.progress() 1081 time.sleep(0.3) 1082 finally: 1083 monitor.progress() 1084 1085 outfile.flush() 1086 errorfile.flush() 1087 outfile.close() 1088 errorfile.close() 1089 1090 return True 1091 1092 ## system command executor with multi-thread 1093 ## stderr/stdout handler 1094 def execSyscmdEnhanced(cmd, wdir=os.getcwd()): 1095 1096 import os, threading 1097 1098 cwd = os.getcwd() 1099 1100 isDone = False 1101 1102 try: 1103 ## change to the working directory 1104 os.chdir(wdir) 1105 1106 child = popen2.Popen3(cmd,1) 1107 child.tochild.close() # don't need stdin 1108 1109 class PipeThread(threading.Thread): 1110 1111 def __init__(self,infile,outfile,stopcb): 1112 self.outfile = outfile 1113 self.infile = infile 1114 self.stopcb = stopcb 1115 self.finished = 0 1116 threading.Thread.__init__(self) 1117 1118 def run(self): 1119 stop = False 1120 while not stop: 1121 buf = self.infile.read(10000) 1122 self.outfile.write(buf) 1123 self.outfile.flush() 1124 time.sleep(0.01) 1125 stop = self.stopcb() 1126 #FIXME: should we do here?: self.infile.read() 1127 #FIXME: this is to make sure that all the output is read (if more than buffer size of output was produced) 1128 self.finished = 1 1129 1130 def stopcb(poll=False): 1131 global exitcode 1132 if poll: 1133 exitcode = child.poll() 1134 return exitcode != -1 1135 1136 out_thread = PipeThread(child.fromchild, sys.stdout, stopcb) 1137 err_thread = PipeThread(child.childerr, sys.stderr, stopcb) 1138 1139 out_thread.start() 1140 err_thread.start() 1141 while not out_thread.finished and not err_thread.finished: 1142 stopcb(True) 1143 monitor.progress() 1144 time.sleep(0.3) 1145 monitor.progress() 1146 1147 sys.stdout.flush() 1148 sys.stderr.flush() 1149 1150 isDone = True 1151 1152 except(Exception,e): 1153 isDone = False 1154 1155 ## return to the original directory 1156 os.chdir(cwd) 1157 1158 return isDone 1159 1160 ############################################################################################ 1161 1162 ###INLINEMODULES### 1163 1164 ############################################################################################ 1165 1166 ## Main program ## 1167 1168 outputsandbox = ###OUTPUTSANDBOX### 1169 input_sandbox = ###INPUTSANDBOX### 1170 wrapperlog = ###WRAPPERLOG### 1171 appexec = ###APPLICATIONEXEC### 1172 appargs = ###APPLICATIONARGS### 1173 appenvs = ###APPLICATIONENVS### 1174 timeout = ###TRANSFERTIMEOUT### 1175 1176 exitcode=-1 1177 1178 import sys, stat, os, os.path, commands 1179 1180 # Change to scratch directory if provided 1181 scratchdir = '' 1182 tmpdir = '' 1183 1184 orig_wdir = os.getcwd() 1185 1186 # prepare log file for job wrapper 1187 out = open(os.path.join(orig_wdir, wrapperlog),'w') 1188 1189 if os.getenv('EDG_WL_SCRATCH'): 1190 scratchdir = os.getenv('EDG_WL_SCRATCH') 1191 elif os.getenv('TMPDIR'): 1192 scratchdir = os.getenv('TMPDIR') 1193 1194 if scratchdir: 1195 (status, tmpdir) = commands.getstatusoutput('mktemp -d %s/gangajob_XXXXXXXX' % (scratchdir)) 1196 if status == 0: 1197 os.chdir(tmpdir) 1198 else: 1199 ## if status != 0, tmpdir should contains error message so print it to stderr 1200 printError('Error making ganga job scratch dir: %s' % tmpdir) 1201 printInfo('Unable to create ganga job scratch dir in %s. Run directly in: %s' % ( scratchdir, os.getcwd() ) ) 1202 1203 ## reset scratchdir and tmpdir to disable the usage of Ganga scratch dir 1204 scratchdir = '' 1205 tmpdir = '' 1206 1207 wdir = os.getcwd() 1208 1209 if scratchdir: 1210 printInfo('Changed working directory to scratch directory %s' % tmpdir) 1211 try: 1212 os.system("ln -s %s %s" % (os.path.join(orig_wdir, 'stdout'), os.path.join(wdir, 'stdout'))) 1213 os.system("ln -s %s %s" % (os.path.join(orig_wdir, 'stderr'), os.path.join(wdir, 'stderr'))) 1214 except Exception,e: 1215 printError(sys.exc_info()[0]) 1216 printError(sys.exc_info()[1]) 1217 str_traceback = traceback.format_tb(sys.exc_info()[2]) 1218 for str_tb in str_traceback: 1219 printError(str_tb) 1220 printInfo('Linking stdout & stderr to original directory failed. Looking at stdout during job run may not be possible') 1221 1222 os.environ['PATH'] = '.:'+os.environ['PATH'] 1223 1224 vo = os.environ['GANGA_LCG_VO'] 1225 1226 try: 1227 printInfo('Job Wrapper start.') 1228 1229 # download inputsandbox from remote cache 1230 for f,guid in input_sandbox['remote'].iteritems(): 1231 if not lcg_file_download(vo, guid, os.path.join(wdir,f), timeout=int(timeout)): 1232 raise Exception('Download remote input %s:%s failed.' % (guid,f) ) 1233 else: 1234 getPackedInputSandbox(f) 1235 1236 printInfo('Download inputsandbox from iocache passed.') 1237 1238 # unpack inputsandbox from wdir 1239 for f in input_sandbox['local']: 1240 getPackedInputSandbox(os.path.join(orig_wdir,f)) 1241 1242 printInfo('Unpack inputsandbox passed.') 1243 1244 printInfo('Loading Python modules ...') 1245 1246 sys.path.insert(0,os.path.join(wdir,PYTHON_DIR)) 1247 1248 # check the python library path 1249 try: 1250 printInfo(' ** PYTHON_DIR: %s' % os.environ['PYTHON_DIR']) 1251 except KeyError: 1252 pass 1253 1254 try: 1255 printInfo(' ** PYTHONPATH: %s' % os.environ['PYTHONPATH']) 1256 except KeyError: 1257 pass 1258 1259 for lib_path in sys.path: 1260 printInfo(' ** sys.path: %s' % lib_path) 1261 1262 ###MONITORING_SERVICE### 1263 monitor = createMonitoringObject() 1264 monitor.start() 1265 1266 # execute application 1267 1268 ## convern appenvs into environment setup script to be 'sourced' before executing the user executable 1269 1270 printInfo('Prepare environment variables for application executable') 1271 1272 env_setup_script = os.path.join(os.getcwd(), '__ganga_lcg_env__.sh') 1273 1274 f = open( env_setup_script, 'w') 1275 f.write('#!/bin/sh' + os.linesep ) 1276 f.write('##user application environmet setup script generated by Ganga job wrapper' + os.linesep) 1277 for k,v in appenvs.items(): 1278 1279 str_env = 'export %s="%s"' % (k, v) 1280 1281 printInfo(' ** ' + str_env) 1282 1283 f.write(str_env + os.linesep) 1284 f.close() 1285 1286 try: #try to make shipped executable executable 1287 os.chmod('%s/%s'% (wdir,appexec),stat.S_IXUSR|stat.S_IRUSR|stat.S_IWUSR) 1288 except: 1289 pass 1290 1291 status = False 1292 try: 1293 # use subprocess to run the user's application if the module is available on the worker node 1294 import subprocess 1295 printInfo('Load application executable with subprocess module') 1296 status = execSyscmdSubprocess('source %s; %s %s' % (env_setup_script, appexec, appargs), wdir) 1297 except ImportError,err: 1298 # otherwise, use separate threads to control process IO pipes 1299 printInfo('Load application executable with separate threads') 1300 status = execSyscmdEnhanced('source %s; %s %s' % (env_setup_script, appexec, appargs), wdir) 1301 1302 os.system("cp %s/stdout stdout.1" % orig_wdir) 1303 os.system("cp %s/stderr stderr.1" % orig_wdir) 1304 1305 printInfo('GZipping stdout and stderr...') 1306 1307 os.system("gzip stdout.1 stderr.1") 1308 1309 # move them to the original wdir so they can be picked up 1310 os.system("mv stdout.1.gz %s/stdout.gz" % orig_wdir) 1311 os.system("mv stderr.1.gz %s/stderr.gz" % orig_wdir) 1312 1313 if not status: 1314 raise Exception('Application execution failed.') 1315 printInfo('Application execution passed with exit code %d.' % exitcode) 1316 1317 createPackedOutputSandbox(outputsandbox,None,orig_wdir) 1318 1319 # pack outputsandbox 1320 # printInfo('== check output ==') 1321 # for line in os.popen('pwd; ls -l').readlines(): 1322 # printInfo(line) 1323 1324 printInfo('Pack outputsandbox passed.') 1325 monitor.stop(exitcode) 1326 1327 # Clean up after us - All log files and packed outputsandbox should be in "wdir" 1328 if scratchdir: 1329 os.chdir(orig_wdir) 1330 os.system("rm %s -rf" % wdir) 1331 except Exception,e: 1332 printError(sys.exc_info()[0]) 1333 printError(sys.exc_info()[1]) 1334 str_traceback = traceback.format_tb(sys.exc_info()[2]) 1335 for str_tb in str_traceback: 1336 printError(str_tb) 1337 1338 printInfo('Job Wrapper stop.') 1339 1340 out.close() 1341 1342 # always return exit code 0 so the in the case of application failure 1343 # one can always get stdout and stderr back to the UI for debug. 1344 sys.exit(0) 1345 """ 1346 return script13471349 """ 1350 Allow peeking of this job's stdout on the WN 1351 (i.e. while job is in 'running' state) 1352 1353 Return value: None 1354 """ 1355 if filename and filename != 'stdout': 1356 logger.warning('Arbitrary file peeking not supported for a running LCG job') 1357 else: 1358 self.inspect(command)13591361 """ 1362 Allow viewing of this job's stdout on the WN 1363 (i.e. while job is in 'running' state) 1364 1365 Return value: None 1366 """ 1367 1368 job = self.getJobObject() 1369 1370 # Use GLITE's job perusal feature if enabled 1371 if self.middleware.upper()=="GLITE" and self.status=="Running" and self.perusable: 1372 fname = os.path.join(job.outputdir,'_peek.dat') 1373 #f = open(fname,'w') 1374 1375 sh = grids[self.middleware.upper()].shell 1376 re, output, m = sh.cmd("glite-wms-job-perusal --get --noint --all -f stdout %s" % self.id, fname) 1377 job.viewFile(fname,cmd) 1378 1379 return None13801382 '''Prepare the JDL''' 1383 1384 script = self.__jobWrapperTemplate__() 1385 1386 job = self.getJobObject() 1387 inpw = job.getInputWorkspace() 1388 1389 wrapperlog = '__jobscript__.log' 1390 1391 import Ganga.Core.Sandbox as Sandbox 1392 1393 script = script.replace('###OUTPUTSANDBOX###',repr(jobconfig.outputbox)) #FIXME: check what happens if 'stdout','stderr' are specified here 1394 1395 script = script.replace('###APPLICATION_NAME###',job.application._name) 1396 script = script.replace('###APPLICATIONEXEC###',repr(jobconfig.getExeString())) 1397 script = script.replace('###APPLICATIONARGS###',repr(jobconfig.getArguments())) 1398 1399 if jobconfig.env: 1400 script = script.replace('###APPLICATIONENVS###',repr(jobconfig.env)) 1401 else: 1402 script = script.replace('###APPLICATIONENVS###',repr({})) 1403 1404 script = script.replace('###WRAPPERLOG###',repr(wrapperlog)) 1405 import inspect 1406 script = script.replace('###INLINEMODULES###',inspect.getsource(Sandbox.WNSandbox)) 1407 1408 mon = job.getMonitoringService() 1409 1410 # # catch the monitoring service information of OctopusMS 1411 # if mon.getJobInfo().has_key('Ganga.Lib.MonitoringServices.Octopus.OctopusMS.OctopusMS'): 1412 # self.monInfo = mon.getJobInfo()['Ganga.Lib.MonitoringServices.Octopus.OctopusMS.OctopusMS'] 1413 # else: 1414 # self.monInfo = None 1415 1416 self.monInfo = None 1417 1418 # set the monitoring file by default to the stdout 1419 if type(self.monInfo) is type({}): 1420 self.monInfo['remotefile'] = 'stdout' 1421 1422 # try to print out the monitoring service information in debug mode 1423 try: 1424 logger.debug('job info of monitoring service: %s' % str(self.monInfo)) 1425 except: 1426 pass 1427 1428 script = script.replace('###MONITORING_SERVICE###',mon.getWrapperScriptConstructorText()) 1429 1430 # prepare input/output sandboxes 1431 packed_files = jobconfig.getSandboxFiles() + Sandbox.getGangaModulesAsSandboxFiles(Sandbox.getDefaultModules()) + Sandbox.getGangaModulesAsSandboxFiles(mon.getSandboxModules()) 1432 sandbox_files = job.createPackedInputSandbox(packed_files) 1433 1434 ## sandbox of child jobs should include master's sandbox 1435 sandbox_files.extend(master_job_sandbox) 1436 1437 ## check the input file size and pre-upload larger inputs to the iocache 1438 inputs = {'remote':{},'local':[]} 1439 lfc_host = '' 1440 1441 ick = True 1442 1443 max_prestaged_fsize = 0 1444 for f in sandbox_files: 1445 1446 idx = self.__check_and_prestage_inputfile__(f) 1447 1448 if not idx: 1449 logger.error('input sandbox preparation failed: %s' % f) 1450 ick = False 1451 break 1452 else: 1453 if idx['lfc_host']: 1454 lfc_host = idx['lfc_host'] 1455 1456 if idx['remote']: 1457 abspath = os.path.abspath(f) 1458 fsize = os.path.getsize(abspath) 1459 1460 if fsize > max_prestaged_fsize: 1461 max_prestaged_fsize = fsize 1462 1463 inputs['remote'].update(idx['remote']) 1464 1465 if idx['local']: 1466 inputs['local'] += idx['local'] 1467 1468 if not ick: 1469 logger.error('stop job submission') 1470 return None 1471 else: 1472 logger.debug('LFC: %s, input file indices: %s' % (lfc_host, repr(inputs)) ) 1473 1474 ## determin the lcg-cp timeout according to the max_prestaged_fsize 1475 ## - using the assumption of 1 MB/sec. 1476 transfer_timeout = config['SandboxTransferTimeout'] 1477 predict_timeout = int( math.ceil( max_prestaged_fsize/1000000.0 ) ) 1478 1479 if predict_timeout > transfer_timeout: 1480 transfer_timeout = predict_timeout 1481 1482 if transfer_timeout < 60: 1483 transfer_timeout = 60 1484 1485 script = script.replace('###TRANSFERTIMEOUT###', '%d' % transfer_timeout) 1486 1487 ## update the job wrapper with the inputsandbox list 1488 script = script.replace('###INPUTSANDBOX###',repr({'remote':inputs['remote'],'local':[ os.path.basename(f) for f in inputs['local'] ]})) 1489 1490 ## write out the job wrapper and put job wrapper into job's inputsandbox 1491 scriptPath = inpw.writefile(FileBuffer('__jobscript_%s__' % job.getFQID('.'),script),executable=1) 1492 input_sandbox = inputs['local'] + [scriptPath] 1493 1494 ## compose output sandbox to include by default the following files: 1495 ## - gzipped stdout (transferred only when the JobLogHandler is WMS) 1496 ## - gzipped stderr (transferred only when the JobLogHandler is WMS) 1497 ## - __jobscript__.log (job wrapper's log) 1498 output_sandbox = [wrapperlog] 1499 1500 if config['JobLogHandler'] == 'WMS': 1501 output_sandbox += ['stdout.gz','stderr.gz'] 1502 1503 if len(jobconfig.outputbox): 1504 output_sandbox += [Sandbox.OUTPUT_TARBALL_NAME] 1505 1506 ## compose LCG JDL 1507 jdl = { 1508 'VirtualOrganisation' : config['VirtualOrganisation'], 1509 'Executable' : os.path.basename(scriptPath), 1510 'Environment': {'GANGA_LCG_VO': config['VirtualOrganisation'], 'GANGA_LOG_HANDLER': config['JobLogHandler'], 'LFC_HOST': lfc_host}, 1511 'StdOutput' : 'stdout', 1512 'StdError' : 'stderr', 1513 'InputSandbox' : input_sandbox, 1514 'OutputSandbox' : output_sandbox 1515 } 1516 1517 if self.middleware.upper() == 'GLITE': 1518 1519 # workaround of glite WMS bug: https://savannah.cern.ch/bugs/index.php?32345 1520 jdl['AllowZippedISB']='false' 1521 1522 if self.perusable: 1523 logger.debug("Adding persual info to JDL") 1524 # remove the ExpiryTime attribute as it's absolute timestamp that will cause the re-submitted job being 1525 # ignored by the WMS. TODO: fix it in a better way. 1526 # jdl['ExpiryTime'] = time.time() + config['JobExpiryTime'] 1527 jdl['PerusalFileEnable']='true' 1528 jdl['PerusalTimeInterval']=120 1529 1530 if self.CE: 1531 jdl['Requirements'] = ['other.GlueCEUniqueID=="%s"' % self.CE ] 1532 # send the CE name as an environmental variable of the job if CE is specified 1533 # this is basically for monitoring purpose 1534 jdl['Environment'].update({'GANGA_LCG_CE': self.CE}) 1535 else: 1536 jdl['Requirements'] = self.requirements.merge(jobconfig.requirements).convert() 1537 # input data 1538 if jobconfig.inputdata: 1539 jdl['InputData'] = jobconfig.inputdata 1540 jdl['DataAccessProtocol'] = [ 'gsiftp' ] 1541 1542 if self.jobtype.upper() in ['MPICH','NORMAL','INTERACTIVE']: 1543 jdl['JobType'] = self.jobtype.upper() 1544 if self.jobtype.upper() == 'MPICH': 1545 jdl['Requirements'].append('(other.GlueCEInfoTotalCPUs >= NodeNumber)') 1546 jdl['Requirements'].append('Member("MPICH",other.GlueHostApplicationSoftwareRunTimeEnvironment)') 1547 jdl['NodeNumber'] = self.requirements.nodenumber 1548 else: 1549 logger.warning('JobType "%s" not supported' % self.jobtype) 1550 return 1551 1552 # additional settings from the job 1553 # if jobconfig.env: 1554 # jdl['Environment'].update(jobconfig.env) 1555 1556 # the argument of JDL should be the argument for the wrapper script 1557 # application argument has been put into the wrapper script 1558 # if jobconfig.args: jdl['Arguments'] = jobconfig.getArguments() 1559 1560 # additional settings from the configuration 1561 ## !!note!! StorageIndex is not defined in EDG middleware 1562 for name in [ 'ShallowRetryCount', 'RetryCount' ]: 1563 if config[name] >= 0: 1564 jdl[name] = config[name] 1565 1566 for name in [ 'Rank', 'ReplicaCatalog', 'StorageIndex', 'MyProxyServer', 'DataRequirements', 'DataAccessProtocol' ]: 1567 if config[name]: 1568 jdl[name] = config[name] 1569 1570 jdlText = Grid.expandjdl(jdl) 1571 logger.debug('subjob JDL: %s' % jdlText) 1572 return inpw.writefile(FileBuffer('__jdlfile__',jdlText))15731575 '''map backend job status to Ganga job status''' 1576 1577 if status == 'Running': 1578 job.updateStatus('running') 1579 1580 elif status == 'Done (Success)': 1581 job.updateStatus('completed') 1582 1583 elif status in ['Aborted','Cancelled','Done (Exit Code !=0)']: 1584 job.updateStatus('failed') 1585 1586 elif status == 'Cleared': 1587 if job.status in LCG._final_ganga_states: 1588 # do nothing in this case as it's in the middle of the corresponding job downloading task 1589 return 1590 logger.warning('The job %d has reached unexpected the Cleared state and Ganga cannot retrieve the output.',job.getFQID('.')) 1591 job.updateStatus('failed') 1592 1593 elif status in ['Submitted','Waiting','Scheduled','Ready','Done (Failed)']: 1594 pass 1595 1596 else: 1597 logger.warning('Unexpected job status "%s"',status)1598 1599 updateGangaJobStatus = staticmethod(updateGangaJobStatus) 16001602 '''Main Monitoring loop''' 1603 1604 profiler = ElapsedTimeProfiler(getLogger(name='Profile.LCG')) 1605 profiler.start() 1606 1607 emulated_bulk_jobs = [] 1608 native_bulk_jobs = [] 1609 1610 for j in jobs: 1611 1612 mt = j.backend.middleware.upper() 1613 1614 if mt == 'EDG' or len(j.subjobs) == 0: 1615 emulated_bulk_jobs.append(j) 1616 else: 1617 native_bulk_jobs.append(j) 1618 # put the individually submitted subjobs into the emulated_bulk_jobs list 1619 # those jobs should be checked individually as a single job 1620 for sj in j.subjobs: 1621 if sj.backend.flag == 1 and sj.status in ['submitted','running']: 1622 logger.debug('job %s submitted individually. separate it in a different monitoring loop.' % sj.getFQID('.')) 1623 emulated_bulk_jobs.append(sj) 1624 1625 # involk normal monitoring method for normal jobs 1626 for j in emulated_bulk_jobs: 1627 logger.debug('emulated bulk job to be monitored: %s' % j.getFQID('.')) 1628 IBackend.master_updateMonitoringInformation(emulated_bulk_jobs) 1629 1630 # involk special monitoring method for glite bulk jobs 1631 for j in native_bulk_jobs: 1632 logger.debug('native bulk job to be monitored: %s' % j.getFQID('.')) 1633 LCG.master_bulk_updateMonitoringInformation(native_bulk_jobs) 1634 1635 ## should went through all jobs to update overall master job status 1636 for j in jobs: 1637 if ( len(j.subjobs) > 0 ) and j.backend.id: 1638 logger.debug('updating overall master job status: %s' % j.getFQID('.')) 1639 j.updateMasterJobStatus() 1640 1641 profiler.check('==> master_updateMonitoringInformation() elapsed time')1642 1643 master_updateMonitoringInformation = staticmethod(master_updateMonitoringInformation) 16441646 '''Monitoring loop for normal jobs''' 1647 1648 jobdict = dict([ [job.backend.id,job] for job in jobs if job.backend.id ]) 1649 1650 ## divide jobs into classes based on the middleware type 1651 jobclass = {} 1652 for key in jobdict: 1653 mt = jobdict[key].backend.middleware.upper() 1654 if not jobclass.has_key(mt): 1655 jobclass[mt] = [key] 1656 else: 1657 jobclass[mt].append(key) 1658 1659 ## loop over the job classes 1660 cnt_new_download_task = 0 1661 for mt in jobclass.keys(): 1662 1663 if not config['%s_ENABLE' % mt]: 1664 continue 1665 1666 ## loop over the jobs in each class 1667 status_info, missing_glite_jids = grids[mt].status(jobclass[mt]) 1668 1669 __fail_missing_jobs__(missing_glite_jids, jobdict) 1670 1671 for info in status_info: 1672 1673 create_download_task = False 1674 1675 job = jobdict[info['id']] 1676 1677 if job.backend.actualCE != info['destination']: 1678 logger.info('job %s has been assigned to %s',job.getFQID('.'),info['destination']) 1679 job.backend.actualCE = info['destination'] 1680 1681 if job.backend.status != info['status']: 1682 logger.info('job %s has changed status to %s',job.getFQID('.'),info['status']) 1683 job.backend.status = info['status'] 1684 job.backend.reason = info['reason'] 1685 job.backend.exitcode_lcg = info['exit'] 1686 if info['status'] == 'Done (Success)': 1687 create_download_task = True 1688 else: 1689 LCG.updateGangaJobStatus(job, info['status']) 1690 elif ( info['status'] == 'Done (Success)' ) and ( job.status not in LCG._final_ganga_states ): 1691 create_download_task = True 1692 1693 if create_download_task: 1694 # update to 'running' before changing to 'completing' 1695 if job.status == 'submitted': 1696 job.updateStatus('running') 1697 1698 downloader = get_lcg_output_downloader() 1699 downloader.addTask(grids[mt], job, False) 1700 1701 cnt_new_download_task += 1 1702 1703 if cnt_new_download_task > 0: 1704 downloader = get_lcg_output_downloader() 1705 logger.debug('%d new downloading tasks; %d alive downloading agents' % ( cnt_new_download_task, downloader.countAliveAgent() ) )1706 1707 updateMonitoringInformation = staticmethod(updateMonitoringInformation) 1708 1709 # def master_bulk_updateMonitoringInformation(jobs,updateMasterStatus=True):1711 '''Monitoring loop for glite bulk jobs''' 1712 1713 grid = grids['GLITE'] 1714 1715 if not grid: 1716 return 1717 1718 ## split up the master job into severl LCG bulk job ids 1719 ## - checking subjob status and excluding the master jobs with all subjobs in a final state) 1720 ## - excluding the resubmitted jobs 1721 ## - checking master jobs with the status not being properly updated while all subjobs are in final states 1722 jobdict = {} 1723 #mjob_status_updatelist = [] 1724 for j in jobs: 1725 #cnt_sj_final = 0 1726 if j.backend.id: 1727 1728 ## collect master jobs need to be updated by polling the status from gLite WMS 1729 for sj in j.subjobs: 1730 #if (sj.status in ['completed','failed']): 1731 # cnt_sj_final += 1 1732 1733 if (sj.status not in LCG._final_ganga_states) and \ 1734 (sj.backend.parent_id in j.backend.id) and \ 1735 (not jobdict.has_key(sj.backend.parent_id)): 1736 jobdict[sj.backend.parent_id] = j 1737 1738 # if j not in mjob_status_updatelist: 1739 # mjob_status_updatelist.append(j) 1740 1741 ## collect master jobs with status not being updated even when all subjobs are in final states 1742 #if (j.status not in ['completed','failed']) and (cnt_sj_final == len(j.subjobs)): 1743 # if j not in mjob_status_updatelist: 1744 # mjob_status_updatelist.append(j) 1745 1746 job = None 1747 subjobdict = {} 1748 1749 ## make sure all the status information is available 1750 ## if not ... wait for a while and fetch the status again 1751 def check_info(status): 1752 for info in status: 1753 if info['is_node'] and not info['name']: 1754 return False 1755 return True1756 1757 (status_info, missing_glite_jids) = grid.status(jobdict.keys(),is_collection=True) 1758 1759 __fail_missing_jobs__(missing_glite_jids, jobdict) 1760 1761 ## update GANGA job repository according to the available job information 1762 cnt_new_download_task = 0 1763 for info in status_info: 1764 if not info['is_node']: # this is the info for the master job 1765 1766 cachedParentId = info['id'] 1767 master_jstatus = info['status'] 1768 1769 job = jobdict[cachedParentId] 1770 1771 # update master job's status if needed 1772 if cachedParentId not in job.backend.status.keys(): 1773 # if this happens, something must be seriously wrong 1774 logger.warning('job id not found in the submitted master job: %s' % cachedParentId) 1775 elif master_jstatus != job.backend.status[cachedParentId]: 1776 job.backend.status[cachedParentId] = master_jstatus 1777 1778 subjobdict = dict([ [str(subjob.id),subjob] for subjob in job.subjobs ]) 1779 1780 else: # this is the info for the node job 1781 1782 # subjob's node name is not available 1783 if not info['name']: continue 1784 1785 subjob = subjobdict[info['name'].replace('gsj_','')] 1786 1787 create_download_task = False 1788 1789 # skip updating the resubmitted jobs by comparing: 1790 # - the subjob's parent job id 1791 # - the parent id returned from status 1792 if cachedParentId != subjob.backend.parent_id: 1793 logger.debug('job %s has been resubmitted, ignore the status update.' % subjob.getFQID('.')) 1794 continue 1795 1796 # skip updating the cleared jobs 1797 if info['status'] == 'Cleared' and subjob.status in LCG._final_ganga_states: continue 1798 1799 # skip updating the jobs that are individually resubmitted after the original bulk submission 1800 if subjob.backend.flag == 1: 1801 logger.debug('job %s was resubmitted individually. skip updating it from the monitoring of its master job.' % subjob.getFQID('.')) 1802 # skip updating the jobs that are individually killed 1803 elif subjob.status == 'killed': 1804 logger.debug('job %s was killed individually. skip updating it from the monitoring of its master job.' % subjob.getFQID('.')) 1805 else: 1806 if not subjob.backend.id: 1807 # send out the subjob's id which is becoming available at the first time. 1808 # (a temporary workaround for fixing the monitoring issue of getting the job id) 1809 # Note: As the way of sending job id is implemented as an generic hook triggered 1810 # by the transition from 'submitting' to 'submitted'. For gLite bulk submission 1811 # the id is not available immediately right after the submission, therefore a late 1812 # job id transmission is needed. 1813 # This issue linked to the temporary workaround of setting subjob's status to 'submitted' 1814 # in the master_bulk_(re)submit() methods. In Ganga 5, a clear implementation should be 1815 # applied with the new lock mechanism. 1816 logger.debug('job %s obtained backend id, transmit it to monitoring service.' % subjob.getFQID('.')) 1817 subjob.backend.id = info['id'] 1818 subjob.getMonitoringService().submit() 1819 1820 # in the temporary workaround, there is no need to set job status to 'submitted' 1821 #subjob.updateStatus('submitted') 1822 1823 if subjob.backend.actualCE != info['destination']: 1824 logger.info('job %s has been assigned to %s',subjob.getFQID('.'),info['destination']) 1825 subjob.backend.actualCE = info['destination'] 1826 1827 if subjob.backend.status != info['status']: 1828 logger.info('job %s has changed status to %s',subjob.getFQID('.'),info['status']) 1829 subjob.backend.status = info['status'] 1830 subjob.backend.reason = info['reason'] 1831 subjob.backend.exitcode_lcg = info['exit'] 1832 if info['status'] == 'Done (Success)': 1833 create_download_task = True 1834 else: 1835 LCG.updateGangaJobStatus(subjob, info['status']) 1836 elif ( info['status'] == 'Done (Success)' ) and ( subjob.status not in LCG._final_ganga_states ): 1837 create_download_task = True 1838 1839 if create_download_task: 1840 # update to 'running' before changing to 'completing' 1841 if subjob.status == 'submitted': 1842 subjob.updateStatus('running') 1843 downloader = get_lcg_output_downloader() 1844 downloader.addTask(grid, subjob, True) 1845 1846 cnt_new_download_task += 1 1847 1848 if cnt_new_download_task > 0: 1849 downloader = get_lcg_output_downloader() 1850 logger.debug('%d new downloading tasks; %d alive downloading agents' % ( cnt_new_download_task, downloader.countAliveAgent() ) ) 1851 1852 # update master job status 1853 #if updateMasterStatus: 1854 # for mj in mjob_status_updatelist: 1855 # logger.debug('updating overall master job status: %s' % mj.getFQID('.')) 1856 # mj.updateMasterJobStatus() 1857 1858 master_bulk_updateMonitoringInformation = staticmethod(master_bulk_updateMonitoringInformation) 18591861 '''Update the proxy''' 1862 1863 mt = self.middleware.upper() 1864 return grids[mt].check_proxy()18651867 """Return any matches using the requirements or given jdlfile""" 1868 1869 jdl_file2 = jdl_file 1870 if not jdl_file: 1871 # create a dummy jdl file from the given requirements 1872 import tempfile 1873 jdl = {'VirtualOrganisation' : config['VirtualOrganisation'], 1874 'Executable' : os.path.basename(__file__), 1875 'InputSandbox' : [__file__], 1876 'Requirements' : self.requirements.convert()} 1877 1878 jdl_file_txt = Grid.expandjdl(jdl) 1879 1880 jdl_file2 = tempfile.mktemp('.jdl') 1881 file(jdl_file2, 'w').write(jdl_file_txt) 1882 1883 mt = self.middleware.upper() 1884 matches = grids[mt].list_match(jdl_file2, ce=spec_ce) 1885 1886 # clean up 1887 if not jdl_file: 1888 os.remove(jdl_file2) 1889 1890 return matches18911893 '''Extends the standard Job Configuration with additional attributes''' 18941913 1914 1915 # initialisation 1916 1917 # function for parsing VirtualOrganisation from ConfigVO1895 - def __init__(self,exe=None,inputbox=[],args=[],outputbox=[],env={},inputdata=[],requirements=None):1896 1897 self.inputdata=inputdata 1898 self.requirements=requirements 1899 1900 StandardJobConfig.__init__(self,exe,inputbox,args,outputbox,env)1901 19051907 1908 exe=self.getExeString() 1909 if os.path.dirname(exe) == '.': 1910 return os.path.basename(exe) 1911 else: 1912 return exe1919 re_vo = re.compile(r'.*VirtualOrganisation\s*=\s*"(.*)"') 1920 try: 1921 f = open(file) 1922 for l in f.readlines(): 1923 m = re_vo.match(l.strip()) 1924 if m: 1925 f.close() 1926 return m.groups()[0] 1927 except: 1928 raise Ganga.Utility.Config.ConfigError('ConfigVO %s does not exist.' % file )1929 1930 # configuration preprocessor : avoid VO switching1932 1933 if not opt in ['VirtualOrganisation','ConfigVO']: 1934 # bypass everything irrelevant to the VO 1935 return val 1936 elif opt == 'ConfigVO' and val == '': 1937 # accepting '' to disable the ConfigVO 1938 return val 1939 else: 1940 # try to get current value of VO 1941 if config['ConfigVO']: 1942 vo_1 = __getVOFromConfigVO__(config['ConfigVO']) 1943 else: 1944 vo_1 = config['VirtualOrganisation'] 1945 1946 # get the VO that the user trying to switch to 1947 if opt == 'ConfigVO': 1948 vo_2 = __getVOFromConfigVO__(val) 1949 else: 1950 vo_2 = val 1951 1952 # if the new VO is not the same as the existing one, raise ConfigError 1953 if vo_2 != vo_1: 1954 raise Ganga.Utility.Config.ConfigError('Changing VirtualOrganisation is not allowed in GANGA session.') 1955 1956 return val1957 1958 # configuration preprocessor : enabling middleware1960 1961 if opt in ['EDG_ENABLE','GLITE_ENABLE'] and val: 1962 mt = opt.split('_')[0] 1963 try: 1964 if config[opt]: 1965 logger.info('LCG-%s was already enabled.' % mt) 1966 else: 1967 grids[mt] = Grid(mt) 1968 return grids[mt].active 1969 except: 1970 raise Ganga.Utility.Config.ConfigError('Failed to enable LCG-%s.' % mt) 1971 1972 return val1973 1974 # configuration preprocessor : disabling middleware1976 1977 if opt in ['EDG_ENABLE','GLITE_ENABLE'] and not val: 1978 mt = opt.split('_')[0] 1979 grids[mt] = None 1980 if not config['EDG_ENABLE'] and not config['GLITE_ENABLE']: 1981 logger.warning('No middleware is enabled. LCG handler is disabled.') 1982 1983 return1984 1985 # configuration postprocessor : updating the configuration of the cached Grid objects1987 1988 ## update the config binded with the grid objects 1989 for mt in grids.keys(): 1990 try: 1991 ## NB. grids[mt] is None if the corresponding 1992 ## middleware is not enabled before 1993 grids[mt].config = getConfig('LCG') 1994 logger.debug('update grid configuration for %s' % mt) 1995 except AttributeError: 1996 pass 1997 1998 ## when user changes the 'DefaultLFC', change the env. variable, LFC_HOST, of the cached grid shells 1999 if opt == 'DefaultLFC' and val != None: 2000 for mt in grids.keys(): 2001 try: 2002 grids[mt].shell.env['LFC_HOST'] = val 2003 logger.debug('set env. variable LFC_HOST to %s' % val) 2004 except: 2005 pass 2006 return2007 2008 # configuration preprocessor 2013 2014 # configuration postprocessor2016 logger.info('%s has been set to %s' % (opt,val)) 2017 __disableMiddleware__(opt,val) 2018 __updateGridObjects__(opt,val) 2019 return2020 2021 # global variables 2022 logger = getLogger() 2023 2024 logger.debug('LCG module initialization: begin') 2025 2026 config = makeConfig('LCG','LCG/gLite/EGEE configuration parameters') 2027 #gproxy_config = getConfig('GridProxy_Properties') 2028 2029 # set default values for the configuration parameters 2030 config.addOption('EDG_ENABLE',False,'enables/disables the support of the EDG middleware') 2031 2032 config.addOption('EDG_SETUP', '/afs/cern.ch/sw/ganga/install/config/grid_env_auto.sh', \ 2033 'sets the LCG-UI environment setup script for the EDG middleware', \ 2034 filter=Ganga.Utility.Config.expandvars) 2035 2036 config.addOption('GLITE_ENABLE', True, 'Enables/disables the support of the GLITE middleware') 2037 2038 config.addOption('GLITE_SETUP', '/afs/cern.ch/sw/ganga/install/config/grid_env_auto.sh', \ 2039 'sets the LCG-UI environment setup script for the GLITE middleware', \ 2040 filter=Ganga.Utility.Config.expandvars) 2041 2042 config.addOption('VirtualOrganisation','dteam','sets the name of the grid virtual organisation') 2043 2044 config.addOption('ConfigVO','','sets the VO-specific LCG-UI configuration script for the EDG resource broker', \ 2045 filter=Ganga.Utility.Config.expandvars) 2046 2047 config.addOption('Config','','sets the generic LCG-UI configuration script for the GLITE workload management system', \ 2048 filter=Ganga.Utility.Config.expandvars) 2049 2050 config.addOption('AllowedCEs','','sets allowed computing elements by a regular expression') 2051 config.addOption('ExcludedCEs','','sets excluded computing elements by a regular expression') 2052 2053 config.addOption('MyProxyServer','myproxy.cern.ch','sets the myproxy server') 2054 config.addOption('RetryCount',3,'sets maximum number of job retry') 2055 config.addOption('ShallowRetryCount',10,'sets maximum number of job shallow retry') 2056 2057 config.addOption('Rank','','sets the ranking rule for picking up computing element') 2058 config.addOption('ReplicaCatalog','','sets the replica catalogue server') 2059 config.addOption('StorageIndex','','sets the storage index') 2060 2061 config.addOption('DefaultSE','srm.cern.ch','sets the default storage element') 2062 config.addOption('DefaultSRMToken','','sets the space token for storing temporary files (e.g. oversized input sandbox)') 2063 config.addOption('DefaultLFC','prod-lfc-shared-central.cern.ch','sets the file catalogue server') 2064 config.addOption('BoundSandboxLimit',10 * 1024 * 1024,'sets the size limitation of the input sandbox, oversized input sandbox will be pre-uploaded to the storage element specified by \'DefaultSE\' in the area specified by \'DefaultSRMToken\'') 2065 2066 config.addOption('Requirements','Ganga.Lib.LCG.LCGRequirements','sets the full qualified class name for other specific LCG job requirements') 2067 2068 config.addOption('DataRequirements','','sets the DataRequirements of the job') 2069 2070 config.addOption('DataAccessProtocol', ['gsiftp'], 'sets the DataAccessProtocol') 2071 2072 config.addOption('SandboxCache','Ganga.Lib.LCG.LCGSandboxCache','sets the full qualified class name for handling the oversized input sandbox') 2073 2074 config.addOption('GliteBulkJobSize', 50, 'sets the maximum number of nodes (i.e. subjobs) in a gLite bulk job') 2075 2076 config.addOption('SubmissionThread', 10, 'sets the number of concurrent threads for job submission to gLite WMS') 2077 2078 config.addOption('SubmissionTimeout', 300, 'sets the gLite job submission timeout in seconds') 2079 2080 config.addOption('StatusPollingTimeout', 300, 'sets the gLite job status polling timeout in seconds') 2081 2082 config.addOption('OutputDownloaderThread', 10, 'sets the number of concurrent threads for downloading job\'s output sandbox from gLite WMS') 2083 2084 config.addOption('SandboxTransferTimeout', 60, 'sets the transfer timeout of the oversized input sandbox') 2085 2086 config.addOption('JobLogHandler', 'WMS', 'sets the way the job\'s stdout/err are being handled.') 2087 2088 config.addOption('MatchBeforeSubmit', False, 'sets to True will do resource matching before submitting jobs, jobs without any matched resources will fail the submission') 2089 2090 config.addOption('IgnoreGliteScriptHeader', False, 'sets to True will load script-based glite-wms-* commands forcely with current python, a trick for 32/64 bit compatibility issues.') 2091 2092 #config.addOption('JobExpiryTime', 30 * 60, 'sets the job\'s expiry time') 2093 2094 # apply preconfig and postconfig handlers 2095 config.attachUserHandler(__preConfigHandler__,__postConfigHandler__) 2096 2097 # startup two independent middleware environments for LCG 2098 grids = {'EDG':None,'GLITE':None} 2099 2100 if config['GLITE_ENABLE']: 2101 grids['GLITE'] = Grid('GLITE') 2102 # if grids['GLITE'].shell: 2103 # config.setSessionValue('DefaultLFC',grids['GLITE'].shell.env['LFC_HOST']) 2104 config.setSessionValue('GLITE_ENABLE',grids['GLITE'].active) 2105 2106 if config['EDG_ENABLE']: 2107 grids['EDG'] = Grid('EDG') 2108 # if grids['EDG'].shell: 2109 # config.setSessionValue('DefaultLFC', grids['EDG'].shell.env['LFC_HOST']) 2110 config.setSessionValue('EDG_ENABLE', grids['EDG'].active) 2111 2112 logger.debug('LCG module initialization: end') 2113 2114 # $Log: not supported by cvs2svn $ 2115 # Revision 1.38 2009/07/15 08:23:29 hclee 2116 # add resource match-making as an option before doing real job submission to WMS. 2117 # - this option can be activated by setting config.LCG.MatchBeforeSubmit = True 2118 # 2119 # Revision 1.37 2009/06/24 19:12:48 hclee 2120 # add support for two JDL attributes: DataRequirements & DataAccessProtocol 2121 # 2122 # Revision 1.36 2009/06/09 15:41:44 hclee 2123 # bugfix: https://savannah.cern.ch/bugs/?50589 2124 # 2125 # Revision 1.35 2009/06/05 12:23:15 hclee 2126 # bugfix for https://savannah.cern.ch/bugs/?51298 2127 # 2128 # Revision 1.34 2009/03/27 10:14:33 hclee 2129 # fix race condition issue: https://savannah.cern.ch/bugs/?48435 2130 # 2131 # Revision 1.33 2009/03/12 12:26:16 hclee 2132 # merging bug fixes from branch Ganga-LCG-old-MTRunner to trunk 2133 # 2134 # Revision 1.32 2009/03/12 12:17:31 hclee 2135 # adopting GangaThread in Ganga.Core 2136 # 2137 # Revision 1.31 2009/02/25 08:39:20 hclee 2138 # introduce and adopt the basic class for Ganga multi-thread handler 2139 # 2140 # Revision 1.30.2.2 2009/03/03 13:23:43 hclee 2141 # failing Ganga jobs if the corresponding glite jobs have been removed from WMS 2142 # 2143 # Revision 1.30.2.1 2009/03/03 12:42:54 hclee 2144 # set Ganga job to fail if the corresponding glite jobs have been removed from WMS 2145 # 2146 # Revision 1.30 2009/02/16 14:10:05 hclee 2147 # change basedir of DQ2SandboxCache from users to userxx where xx represents the last two digits of year 2148 # 2149 # Revision 1.29 2009/02/05 19:35:36 hclee 2150 # GridSandboxCache enhancement: 2151 # - put cached file information in job repository (instead of __iocache__ file) 2152 # - add and expose method: list_cached_files() 2153 # 2154 # Revision 1.28 2009/02/05 09:00:40 hclee 2155 # add AllowZippedISB=false to glite JDL 2156 # - workaround for WMS bug: https://savannah.cern.ch/bugs/index.php?32345 2157 # 2158 # Revision 1.27 2009/02/04 17:01:02 hclee 2159 # enhancement for bug: https://savannah.cern.ch/bugs/?43502 2160 # 2161 # Revision 1.26 2009/01/26 16:11:33 hclee 2162 # modification for handling stdout/err in different ways 2163 # - add config.LCG.JobLogHandler, default value is 'WMS', meaning that stdout/err 2164 # will be shipped back to user via WMS's output sandbox mechanism 2165 # - set config.LCG.JobLogHandler to other values will remove stdout/err from WMS's output sandbox 2166 # and the application can pick it up accordingly to handle stdout/err in different ways 2167 # (e.g. store it in a DQ2 dataset) 2168 # 2169 # Revision 1.25 2009/01/16 09:15:11 hclee 2170 # fix for glite perusable function 2171 # 2172 # Revision 1.24 2009/01/15 13:16:31 hclee 2173 # killing partially submitted bulk jobs on WMS immediately if the whole job submission is not done properly 2174 # 2175 # Revision 1.23 2008/12/11 11:14:33 hclee 2176 # clean up logging messages 2177 # 2178 # Revision 1.22 2008/12/11 09:15:31 hclee 2179 # allow to set the max. node number of a glite bulk job 2180 # 2181 # Revision 1.21 2008/12/08 08:44:52 hclee 2182 # make the number of output downloader threads configurable 2183 # 2184 # Revision 1.20 2008/11/25 15:26:07 hclee 2185 # introducing "SubmissionThread" configuration variable for setting the concurrent 2186 # number of job submission threads 2187 # 2188 # Revision 1.19 2008/11/13 11:34:23 hclee 2189 # update master job's status at the end of the master_updateMonitorInformation() in any case 2190 # 2191 # Revision 1.18 2008/11/07 13:02:25 hclee 2192 # expand $VAR and '~' when setting path-like options 2193 # 2194 # Revision 1.17 2008/11/05 13:51:03 hclee 2195 # fix the bug in passing LFC_HOST to the job wrapper while using LCGSandboxCache 2196 # 2197 # Revision 1.16 2008/11/05 10:20:58 hclee 2198 # fix the bug triggering the annoying warning message after subjob resubmission 2199 # 2200 # Revision 1.15 2008/11/03 15:27:48 hclee 2201 # enhance the internal setup for the SandboxCache 2202 # 2203 # Revision 1.14 2008/10/08 07:42:47 hclee 2204 # avoid doing glite-wms-job-cancel on jobs which is in a final state 2205 # - glite bulk job status is now correctly stored as master job's status 2206 # 2207 # Revision 1.13 2008/09/30 17:51:08 hclee 2208 # fine tune the typelist attribute in the schema 2209 # 2210 # Revision 1.12 2008/09/29 13:17:55 hclee 2211 # fix the type checking issue 2212 # 2213 # Revision 1.11 2008/09/23 12:29:32 hclee 2214 # fix the status update logic 2215 # 2216 # Revision 1.10 2008/09/22 22:43:41 hclee 2217 # cache the logging information coming out from the LCGOutputDownloader threads 2218 # 2219 # Revision 1.9 2008/09/19 11:45:19 hclee 2220 # turn off debug message of the MTRunner objects 2221 # try to avoid the race condition amoung concurrent threads 2222 # 2223 # Revision 1.8 2008/09/18 16:34:58 hclee 2224 # improving job submission/output fetching performance 2225 # 2226 # Revision 1.7 2008/09/15 20:42:38 hclee 2227 # improve sandbox cache handler and adopt it in the LCG backend 2228 # 2229 # Revision 1.6 2008/09/04 14:00:34 hclee 2230 # fix the type-checking issue when setting up CE attribute 2231 # 2232 # Revision 1.5 2008/08/12 13:57:42 hclee 2233 # - remove redundant functions 2234 # - set minimum timeout of downloading oversized inputsandbox to 60 secs. 2235 # 2236 # Revision 1.4 2008/08/12 12:37:37 hclee 2237 # - improving oversized inputsandbox downloading 2238 # * add more debug information 2239 # * automatically determine the lcg-cp timeout assuming the rate of 1MB/sec 2240 # * add config.LCG.SandboxTransferTimeout allowing user to set it manually 2241 # 2242 # Revision 1.3 2008/07/30 10:27:22 hclee 2243 # fix indentation issue in the code 2244 # 2245 # Revision 1.2 2008/07/28 11:00:55 hclee 2246 # patching up to the up-to-date development after CVS migration 2247 # 2248 # Revision 1.95.4.12 2008/07/15 11:51:42 hclee 2249 # bug fix: https://savannah.cern.ch/bugs/?37825https://savannah.cern.ch/bugs/?37825 2250 # 2251 # Revision 1.95.4.11 2008/07/09 13:26:08 hclee 2252 # bug fix of https://savannah.cern.ch/bugs/index.php?38368 2253 # - ignoring configuration postprocess on the grid object corresponding to a 2254 # disabled middleware 2255 # 2256 # Revision 1.95.4.10 2008/07/09 13:10:18 hclee 2257 # apply the patch of feature request: https://savannah.cern.ch/bugs/?37825 2258 # - using scratch directory as job's working directory 2259 # 2260 # Revision 1.95.4.9 2008/05/15 16:01:08 hclee 2261 # - bugfix #36178 (subprocess in python2.5) 2262 # 2263 # Revision 1.95.4.8 2008/05/08 13:28:06 hclee 2264 # gzipped stdout stderr 2265 # 2266 # Revision 1.95.4.7 2008/03/31 15:56:27 hclee 2267 # merge the srmv2 space token support made in Ganga4 branch 2268 # 2269 # Revision 1.95.4.6 2008/03/07 12:27:31 hclee 2270 # distinguish application exitcode and middleware exitcode in schema 2271 # - exitcode: application exitcode 2272 # - exitcode_lcg: middleware exitcode 2273 # 2274 # Revision 1.95.4.5 2008/02/06 17:05:01 hclee 2275 # add descriptions of configuration attributes 2276 # 2277 # Revision 1.95.4.4 2008/02/06 11:21:20 hclee 2278 # merge 4.4 and 5.0 and fix few issues 2279 # 2280 # Revision 1.95.4.3 2007/12/11 09:54:30 amuraru 2281 # moved GLITE_SETUP and EDG_SETUP to LCG module 2282 # 2283 # Revision 1.95.4.2 2007/12/10 18:05:13 amuraru 2284 # merged the 4.4.4 changes 2285 # 2286 # Revision 1.95.4.1 2007/10/12 13:56:25 moscicki 2287 # merged with the new configuration subsystem 2288 # 2289 # Revision 1.95.6.3 2007/10/12 08:16:50 roma 2290 # Migration to new Config 2291 # 2292 # Revision 1.95.6.2 2007/10/09 15:06:47 roma 2293 # Migration to new Config 2294 # 2295 # Revision 1.95.6.1 2007/09/25 09:45:12 moscicki 2296 # merged from old config branch 2297 # 2298 # Revision 1.111 2007/12/04 17:26:19 hclee 2299 # fix small typo 2300 # 2301 # Revision 1.110 2007/12/04 17:19:42 hclee 2302 # - fix bugs in updating bulk job's status 2303 # - fix status parser for gLite 3.1 2304 # 2305 # Revision 1.109 2007/12/04 15:53:49 moscicki 2306 # sparated Grid class into another module 2307 # added optional import of GridSimulator class 2308 # 2309 # Revision 1.108 2007/11/30 11:31:12 hclee 2310 # - improve the job id parser in the submit method 2311 # - remove the warning message for individual subjob submission/killing 2312 # 2313 # Revision 1.107 2007/11/29 13:57:40 hclee 2314 # fill up subjob ids in the monitoring loop 2315 # 2316 # Revision 1.106 2007/11/23 15:22:52 hclee 2317 # add performance profiler 2318 # 2319 # Revision 1.105 2007/11/09 03:12:39 hclee 2320 # bug fix on job id parser for edg-job-submit command 2321 # 2322 # Revision 1.104 2007/11/08 02:40:31 hclee 2323 # fix the bug of parsing job id of edg-job-submit, remove the heading white spaces before parsing 2324 # 2325 # Revision 1.103 2007/10/23 12:18:43 hclee 2326 # fix the subjob ordering issue of the glite collective job 2327 # 2328 # Revision 1.102 2007/10/19 14:43:14 hclee 2329 # use -i in LCG command to kill multiple subjobs which are individually resubmitted 2330 # 2331 # Revision 1.101 2007/10/19 14:32:39 hclee 2332 # bug fix for resubmission and kill on individual subjob 2333 # 2334 # Revision 1.100 2007/10/19 12:34:21 hclee 2335 # - improving the control of the resubmission of each individual subjob submitted through glite-bulk job 2336 # - enabling kill() on each individual subjob submitted through glite-bulk job 2337 # - updating job.info.submit_count on subjobs in submit and resubmit methods 2338 # 2339 # Revision 1.99 2007/10/11 12:00:16 hclee 2340 # support job resubmission on the glite subjobs 2341 # 2342 # Revision 1.98 2007/10/08 16:21:01 hclee 2343 # - introduce "ShallowRetryCount" JDL attribute and set default to 10 2344 # - use the subprocess module to launch the application executable in the job wrapper 2345 # 2346 # Revision 1.97 2007/09/25 13:22:19 hclee 2347 # implement the peek method with Octopus monitoring service 2348 # 2349 # Revision 1.114 2008/01/18 15:24:16 hclee 2350 # - integrate job perusal feature implemented by Philip 2351 # - fix bugs in backend.loginfo() and backend.inspect() 2352 # 2353 # Revision 1.113 2008/01/10 11:46:54 hclee 2354 # - disable the JDL attribute "ExpiryTime" to avoid the immediate crash of the resubmitted jobs 2355 # - merge the modification for enabling glite job perusal feature (contributed by Philip Rodrigues) 2356 # 2357 # Revision 1.112 2007/12/14 11:32:58 hclee 2358 # fix the broken bulk submission - add temporary workaround to avoid the master job's state transition from 'submitted' to 'submitting' 2359 # 2360 # Revision 1.111 2007/12/04 17:26:19 hclee 2361 # fix small typo 2362 # 2363 # Revision 1.110 2007/12/04 17:19:42 hclee 2364 # - fix bugs in updating bulk job's status 2365 # - fix status parser for gLite 3.1 2366 # 2367 # Revision 1.109 2007/12/04 15:53:49 moscicki 2368 # sparated Grid class into another module 2369 # added optional import of GridSimulator class 2370 # 2371 # Revision 1.108 2007/11/30 11:31:12 hclee 2372 # - improve the job id parser in the submit method 2373 # - remove the warning message for individual subjob submission/killing 2374 # 2375 # Revision 1.107 2007/11/29 13:57:40 hclee 2376 # fill up subjob ids in the monitoring loop 2377 # 2378 # Revision 1.106 2007/11/23 15:22:52 hclee 2379 # add performance profiler 2380 # 2381 # Revision 1.105 2007/11/09 03:12:39 hclee 2382 # bug fix on job id parser for edg-job-submit command 2383 # 2384 # Revision 1.104 2007/11/08 02:40:31 hclee 2385 # fix the bug of parsing job id of edg-job-submit, remove the heading white spaces before parsing 2386 # 2387 # Revision 1.103 2007/10/23 12:18:43 hclee 2388 # fix the subjob ordering issue of the glite collective job 2389 # 2390 # Revision 1.102 2007/10/19 14:43:14 hclee 2391 # use -i in LCG command to kill multiple subjobs which are individually resubmitted 2392 # 2393 # Revision 1.101 2007/10/19 14:32:39 hclee 2394 # bug fix for resubmission and kill on individual subjob 2395 # 2396 # Revision 1.100 2007/10/19 12:34:21 hclee 2397 # - improving the control of the resubmission of each individual subjob submitted through glite-bulk job 2398 # - enabling kill() on each individual subjob submitted through glite-bulk job 2399 # - updating job.info.submit_count on subjobs in submit and resubmit methods 2400 # 2401 # Revision 1.99 2007/10/11 12:00:16 hclee 2402 # support job resubmission on the glite subjobs 2403 # 2404 # Revision 1.98 2007/10/08 16:21:01 hclee 2405 # - introduce "ShallowRetryCount" JDL attribute and set default to 10 2406 # - use the subprocess module to launch the application executable in the job wrapper 2407 # 2408 # Revision 1.97 2007/09/25 13:22:19 hclee 2409 # implement the peek method with Octopus monitoring service 2410 # 2411 # Revision 1.95 2007/08/09 14:01:45 kuba 2412 # fixed the logic of dynamic requirements loading (fix from Johannes) 2413 # 2414 # Revision 1.94 2007/08/09 11:03:45 kuba 2415 # protection for passing non-strings to printError and printWarning functions 2416 # 2417 # Revision 1.93 2007/08/01 13:39:27 hclee 2418 # replace old glite-job-* commands with glite-wms-job-* commands 2419 # 2420 # Revision 1.92 2007/07/27 15:13:39 moscicki 2421 # merged the monitoring services branch from kuba 2422 # 2423 # Revision 1.91 2007/07/25 14:08:07 hclee 2424 # - combine the query for glite subjob id (right after the job submission) with the hook of sending monitoring information to Dashboard 2425 # - improve the debug message in the job wrapper 2426 # 2427 # Revision 1.90 2007/07/24 13:53:11 hclee 2428 # query for subjob ids right after the glite bulk submission 2429 # 2430 # Revision 1.89 2007/07/16 15:42:16 hclee 2431 # - move LCGRequirements out from LCG class 2432 # - add config['LCG']['Requirements'] attribute, default to the LCGRequirements class 2433 # - dynamic loading of the requirements module, allowing applications to override merge() and convert() methods for app specific requirement based on the GLUE schema 2434 # 2435 # Revision 1.88 2007/07/10 13:08:32 moscicki 2436 # docstring updates (ganga devdays) 2437 # 2438 # Revision 1.87 2007/07/03 10:05:10 hclee 2439 # pass the GridShell instance to GridCache for pre-staging oversized inputsandbox 2440 # 2441 # Revision 1.86.2.1 2007/06/21 15:04:24 moscicki 2442 # improvement of the monitoring services interface 2443 # 2444 # Revision 1.86 2007/06/15 08:42:59 hclee 2445 # - adopt the Credential plugin to get the voname from the voms proxy 2446 # - modify the logic of the Grid.check_proxy() method 2447 # 2448 # Revision 1.85 2007/06/06 18:56:38 hclee 2449 # bug fix 2450 # 2451 # Revision 1.84 2007/06/06 15:21:52 hclee 2452 # fix the issue that if the grids['EDG'] and grids['GLITE'] not properly created on the machine without UI installation 2453 # 2454 # Revision 1.83 2007/06/05 16:43:06 hclee 2455 # get default lfc_host from lcg-infosites utility 2456 # 2457 # Revision 1.82 2007/06/05 15:06:22 hclee 2458 # add a post-config hook for setting corresponding env. variables of the cached GridShells 2459 # - for instance, only config['LCG']['DefaultLFC'] affects GridShell.env['LFC_HOST'] 2460 # 2461 # Revision 1.81 2007/05/30 16:17:26 hclee 2462 # check the exit code of the real executable (bug #26290) 2463 # 2464 # Revision 1.80 2007/05/23 15:43:24 hclee 2465 # - introduce 'DefaultLFC' configuration property 2466 # - check the exit code from real executable (bug #26290) 2467 # - pass local 'LFC_HOST' environment variable to grid WNs (bug #26443) 2468 # 2469 # Revision 1.79 2007/05/10 10:05:14 liko 2470 # Use srm.cern.ch for big sandbox and do not overwrite X509_USER_PROXY 2471 # 2472 # Revision 1.78.4.1 2007/06/18 07:44:56 moscicki 2473 # config prototype 2474 # 2475 # Revision 1.78 2007/04/05 14:30:19 hclee 2476 # - fix the bug in distinguishing master and node jobs of the glite bulk submission 2477 # - add logic for handling master_resubmit and master_cancel for glite bulk jobs 2478 # 2479 # Revision 1.77 2007/04/05 07:13:01 hclee 2480 # allow users to call the 'cleanup_iocache()' method when job is in 'completed' and 'failed' status 2481 # 2482 # Revision 1.76 2007/03/23 03:45:02 hclee 2483 # remove CVS confliction marks 2484 # 2485 # Revision 1.75 2007/03/23 03:41:24 hclee 2486 # merge modifications in 4.2.2-bugfix-branch 2487 # 2488 # Revision 1.74 2007/01/31 11:13:52 hclee 2489 # remove the python path prepending when calling edg or glite UI commands 2490 # 2491 # Revision 1.73 2007/01/23 17:32:44 hclee 2492 # input sandbox pre-upload is workable for gLite bulk submission 2493 # 2494 # Revision 1.72 2007/01/23 11:45:58 hclee 2495 # the inputsandbox pre-upload takes into account the shared inputsandbox 2496 # - the shared inputsandbox will not be uploaded again if it has been existing on the remote iocache 2497 # add and export cleanup_iocache() method for deleting the pre-uploaded input sandboxes 2498 # - if the job is not "completed", the operation will be simply ignored with some warning message 2499 # 2500 # Revision 1.71 2007/01/22 16:22:10 hclee 2501 # the workable version for remote file cache using lcg-utils 2502 # 2503 # Revision 1.70 2007/01/17 17:54:36 hclee 2504 # working for file upload 2505 # 2506 # Revision 1.69 2007/01/16 16:58:37 hclee 2507 # In the middle of implementing large inputsandbox support 2508 # 2509 # Revision 1.68 2007/01/16 15:31:11 hclee 2510 # Adopt the GridCache object for remote file I/O 2511 # 2512 # Revision 1.67 2006/12/14 08:53:03 hclee 2513 # add file upload/download/delete methods 2514 # 2515 # Revision 1.66 2006/12/13 13:17:19 hclee 2516 # merge the modifications in the 4-2-2 bugfix branch 2517 # 2518 # Revision 1.65 2006/11/02 13:35:49 hclee 2519 # add resubmission implementations 2520 # 2521 # Revision 1.63.2.8 2006/12/13 12:52:40 hclee 2522 # add _GPI_Prefs 2523 # 2524 # Revision 1.63.2.7 2006/11/22 20:39:10 hclee 2525 # make sure the numerical values of requirements are correctly converted into string 2526 # 2527 # Revision 1.63.2.6 2006/11/22 15:40:16 hclee 2528 # Make a more clear instruction for calling check_proxy method 2529 # 2530 # Revision 1.63.2.5 2006/11/03 15:57:18 hclee 2531 # introduce the environmental variable, GANGA_LCG_CE, for monitoring purpose 2532 # if the backend.CE is specified by the user 2533 # 2534 # Revision 1.63.2.4 2006/11/03 13:19:09 hclee 2535 # rollback unintentional commit to exclude the resubmission feature 2536 # 2537 # Revision 1.63.2.3 2006/11/02 13:25:27 hclee 2538 # implements the resubmit methods for both EDG and GLITE modes 2539 # 2540 # Revision 1.63.2.2 2006/10/26 14:14:46 hclee 2541 # include the monitoring component 2542 # 2543 # Revision 1.63.2.1 2006/10/26 13:33:36 hclee 2544 # - accept the verbosity argument when use backend.loginfo() 2545 # - the backend.loginfo() method returns a filename of the saved logging info instead of printing out of the plain text of the logging info 2546 # 2547 # Revision 1.63 2006/10/24 12:53:48 hclee 2548 # skip taking VO name from the voms proxy if using EDG middleware 2549 # 2550 # Revision 1.62 2006/10/12 13:00:27 hclee 2551 # - for subjobs, change to status 'submitting' before changing to 'submitted' 2552 # 2553 # Revision 1.61 2006/10/09 10:38:39 hclee 2554 # Simplify the usage of the "Grid" objects 2555 # 2556 # Revision 1.60 2006/10/09 09:37:43 hclee 2557 # voms attributes in the proxy takes precedence for VO detection in composing job submission command 2558 # 2559 # Revision 1.59 2006/10/09 09:14:43 hclee 2560 # Appending "MPICH" requirements instead of overriding 2561 # 2562 # Revision 1.58 2006/10/06 08:05:08 hclee 2563 # Add supports for multiple job types (Normal, MPICH, Interactive) 2564 # 2565 # Revision 1.57 2006/10/05 09:12:42 hclee 2566 # - add default value of the configurable parameters 2567 # - simplify the code accordingly by removing the checking of the existence of the configurable parameters 2568 # - expose the exitcode of the real executable inside the job wrapper 2569 # 2570 # Revision 1.56 2006/09/28 14:36:56 hclee 2571 # remove the redundant __credential_validity__ method 2572 # change some message in the submit function to debug level info 2573 # 2574 # Revision 1.55 2006/09/18 09:48:46 hclee 2575 # add "-r" option in job submission command if CE is specified (bypassing the RB match-making) 2576 # change the name of some private method: Grid.proxy_voname() -> Grid.__get_proxy_voname__() 2577 # change the argument of the Grid.__credential_validity__() method. Replace "value" with "type". 2578 # 2579 # Revision 1.54 2006/09/11 12:33:29 hclee 2580 # job status rolls back to "failed" if output fetching fails. 2581 # 2582 # Revision 1.53 2006/09/06 15:08:54 hclee 2583 # Catch and print the log file of grid commands 2584 # 2585 # Revision 1.52 2006/08/28 15:20:32 hclee 2586 # - integrate shared inputsandbox for glite bulk submission 2587 # - small fixes in job wrapper 2588 # 2589 # Revision 1.51 2006/08/24 16:48:24 moscicki 2590 # - master/subjob sandbox support 2591 # - fixes in the config for setting VO 2592 # 2593 # Revision 1.50 2006/08/22 12:06:30 hclee 2594 # unpack the output sandbox tarball after getting output 2595 # 2596 # Revision 1.49 2006/08/21 10:31:55 hclee 2597 # set PATH environment to search current working directory in the job wrapper 2598 # 2599 # Revision 1.48 2006/08/18 16:08:05 hclee 2600 # small fix for vo switching 2601 # 2602 # Revision 1.47 2006/08/18 13:46:00 hclee 2603 # update for the bugs: 2604 # - #19122: use jobconfig.getExeString() to get correct path of exeutable 2605 # - #19067: use an enhanced system call handler implemented in the Local handler to better control the stdout/stderr 2606 # - #19155: job submission/cancelling/monitoring will be just failed if no proxy is available 2607 # 2608 # Revision 1.46 2006/08/16 15:15:33 hclee 2609 # fix the path problem of the actual executable in job wrapper 2610 # 2611 # Revision 1.45 2006/08/15 11:10:01 hclee 2612 # - reduce verbosity 2613 # - correct the way to specify default configuration attributes 2614 # 2615 # Revision 1.44 2006/08/10 13:39:50 moscicki 2616 # using Sandbox mechanism 2617 # 2618 # Revision 1.43 2006/08/09 14:36:10 hclee 2619 # - use ProxyTimeLeft and ProxyTimeValid in proxy creating and checking 2620 # - in submit and cancel methods, check_proxy is called if no valid proxy available 2621 # 2622 # Revision 1.42 2006/08/09 11:07:32 hclee 2623 # - use getCredential method to create a credential 2624 # - enhancement in get_output() method 2625 # 2626 # Revision 1.41 2006/08/08 21:44:23 hclee 2627 # change wrapper log format 2628 # 2629 # Revision 1.40 2006/08/08 21:23:40 hclee 2630 # Change format of the wrapper log 2631 # 2632 # Revision 1.39 2006/08/08 20:02:36 hclee 2633 # - Add job wrapper 2634 # - modify the loop of backend status update 2635 # - use GridShell module to create Shell object 2636 # 2637 # Revision 1.38 2006/08/08 14:23:49 hclee 2638 # - Integrate with Credential module 2639 # - Add method for getting Shell objects 2640 # - In the middle of the job wrapper implementation 2641 # 2642 # Revision 1.37 2006/07/31 13:25:55 hclee 2643 # replace the code of master job update with the factored out method: updateMasterJobStatus() 2644 # 2645 # Revision 1.36 2006/07/31 13:06:21 hclee 2646 # Integration with state machine 2647 # few bug fixes 2648 # 2649 # Revision 1.35 2006/07/20 21:06:15 hclee 2650 # - remove existing "jdlrepos" directory of bulk job 2651 # 2652 # Revision 1.34 2006/07/20 20:51:59 hclee 2653 # - return False if bulk submission failed 2654 # 2655 # Revision 1.33 2006/07/19 17:06:20 hclee 2656 # initial implementation for gLite bulk submission 2657 # 2658 # Revision 1.32 2006/07/18 15:09:59 hclee 2659 # Supporting both EDG and GLITE middlewares in LCG handler 2660 # 2661 # Revision 1.31 2006/07/17 10:14:29 hclee 2662 # merge Alvin's patch for the version (Ganga-LCG-1-1) in Ganga release 4-2-0-beta2 2663 # 2664 # Revision 1.30 2006/07/10 13:12:59 moscicki 2665 # changes from Johannes: outputdata handling and a bugfix 2666 # 2667 # Revision 1.29 2006/07/07 14:27:01 hclee 2668 # Fix the scenario of VO check in the __avoidVOSwitch__ function 2669 # 2670 # Revision 1.28 2006/07/07 12:04:11 hclee 2671 # Avoid VO switching in GANGA session 2672 # 2673 # Revision 1.27 2006/07/04 11:41:36 hclee 2674 # Add internal function in Grid object for setting up the edg-job-submit options 2675 # - effective configurations are used in composing the options 2676 # - more virtual organisation checks 2677 # - the function will be called everytime the submit() function is called 2678 # 2679 # Revision 1.27 2006/07/03 13:55:30 hclee 2680 # Add internal function in Grid object for setting up the edg-job-submit options 2681 # - effective configurations are used in composing the options 2682 # - more virtual organisation checks 2683 # - the function will be called everytime the submit() function is called 2684 # 2685 # Revision 1.26 2006/06/07 17:16:02 liko 2686 # Additional logic for the cleared state 2687 # 2688 # Revision 1.25 2006/06/07 17:15:44 liko 2689 # Additional logic for the cleared state 2690 # 2691 # Revision 1.24 2006/05/31 10:12:17 liko 2692 # Add Cleared 2693 # 2694 # Revision 1.23 2006/05/19 22:11:59 liko 2695 # Add status Submitted 2696 # 2697 # Revision 1.22 2006/05/18 15:38:31 liko 2698 # : 2699 # 2700 # Revision 1.21 2006/05/15 16:39:30 liko 2701 # Done (Failed) is not final state ... 2702 # 2703 # Revision 1.20 2006/05/08 11:50:53 liko 2704 # Include changes by Johannes 2705 # 2706 # Revision 1.19 2006/04/27 09:13:25 moscicki 2707 # 2708 # PREFIX_HACK: 2709 # work around inconsistency of LCG setup script and commands: 2710 # LCG commands require python2.2 but the setup script does not set this version of python. If another version of python is used (like in GUI), then python2.2 runs against wrong python libraries possibly should be fixed in LCG: either remove python2.2 from command scripts or make setup script force correct version of python 2711 # 2712 # Revision 1.18 2006/04/24 17:30:02 liko 2713 # Several bug fixes 2714 # 2715 # Revision 1.17 2006/03/20 10:01:53 liko 2716 # Fix retry count 2717 # 2718 # Revision 1.16 2006/03/17 00:55:19 liko 2719 # Fix problem with replica catalog 2720 # 2721 # Revision 1.15 2006/03/17 00:06:55 liko 2722 # defaults for config attributes ReplicaCatalog 2723 # 2724 # Revision 1.14 2006/03/16 23:53:12 liko 2725 # Fix stupid proxy message 2726 # 2727 # Revision 1.13 2006/02/10 14:38:37 moscicki 2728 # replaced KeyError by ConfigError 2729 # 2730 # fixed: bug #13462 overview: stdin and stdout are unconditionally added to OutputSandbox 2731 # 2732 # fixed: edg-job-cancel with the new release of LCG asks an interactive questions which made Ganga to "hang" on it, --noint option added wherever possible 2733 # 2734 # Revision 1.12 2006/02/07 13:02:33 liko 2735 # 2736 # 1) Fix problem with conflicting requirements definitions 2737 # 2) Fix problem with AllowedCEs in configuration 2738 # 3) Support for LFC in Athena handler 2739 # 2740 # Revision 1.11 2005/11/08 09:15:05 liko 2741 # Fix a bug in the handling of the environment 2742 # 2743 # Revision 1.10 2005/10/21 13:19:09 moscicki 2744 # fixed: kill should return the boolean sucess code 2745 # 2746 # Revision 1.9 2005/10/11 11:56:37 liko 2747 # Default values for new configuration file 2748 # 2749 # Revision 1.8 2005/09/22 21:41:15 liko 2750 # Add Cleared status 2751 # 2752 # Revision 1.7 2005/09/21 09:05:58 andrew 2753 # Added a retry mechanism to the 'proxy-init' call. Now the user has 2754 # 3 retries before giving up. 2755 # 2756 # Revision 1.6 2005/09/06 11:37:13 liko 2757 # Mainly the Athena handler 2758 # 2759 # Revision 1.5 2005/09/02 12:46:10 liko 2760 # Extensively updated version 2761
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Mon Jun 25 10:35:37 2012 | http://epydoc.sourceforge.net |