Package Ganga :: Package Lib :: Package LCG :: Module Grid
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.LCG.Grid

   1  import os 
   2  import re 
   3  import tempfile 
   4  from types import * 
   5   
   6  from Ganga.GPIDev.Schema import * 
   7  from Ganga.GPIDev.Lib.File import * 
   8  from Ganga.GPIDev.Credentials import getCredential  
   9   
  10  from Ganga.Utility.Config import getConfig, ConfigError 
  11  from Ganga.Utility.logging import getLogger 
  12   
  13  from Ganga.Utility.GridShell import getShell 
  14   
  15  from Ganga.Lib.LCG.GridftpSandboxCache import GridftpFileIndex, GridftpSandboxCache 
  16   
  17  from Ganga.Lib.LCG.Utility import * 
  18   
  19  # global variables 
  20  logger = getLogger() 
  21   
22 -class Grid(object):
23 '''Helper class to implement grid interaction''' 24 25 # attributes 26 _attributes = ('middleware', 'credential', 'config', 'active', 'perusable') 27
28 - def __init__(self,middleware='EDG'):
29 30 self.active = False 31 32 self.re_token = re.compile('^token:(.*):(.*)$') 33 34 self.credential = None 35 36 self.middleware = middleware.upper() 37 38 self.perusable = False 39 40 self.config = getConfig('LCG') 41 42 # check that UI has been set up 43 # start up a shell object specific to the middleware 44 self.shell = getShell(self.middleware) 45 46 self.proxy_id = {} 47 48 if not self.shell: 49 logger.warning('LCG-%s UI has not been configured. The plugin has been disabled.' % self.middleware) 50 return 51 52 # create credential for this Grid object 53 self.active = self.check_proxy()
54
55 - def __setattr__(self,attr,value):
56 object.__setattr__(self, attr, value) 57 # dynamic update the internal shell object if the config attribute is reset 58 if attr == 'config': 59 self.shell = getShell(self.middleware)
60
61 - def __get_cmd_prefix_hack__(self,binary=False):
62 # this is to work around inconsistency of LCG setup script and commands: 63 # LCG commands require python2.2 but the setup script does not set this version of python 64 # if another version of python is used (like in GUI), then python2.2 runs against wrong python libraries 65 # possibly should be fixed in LCG: either remove python2.2 from command scripts or make setup script force 66 # correct version of python 67 prefix_hack = "${%s_LOCATION}/bin/" % self.middleware 68 69 # some script-based glite-wms commands (status and logging-info) requires (#/usr/bin/env python2) 70 # which leads to a python conflict problem. 71 if not binary: 72 prefix_hack = 'python '+prefix_hack 73 74 return prefix_hack
75
76 - def __set_submit_option__(self):
77 78 # find out how the VO has been specified 79 80 submit_option = '' 81 82 msg = 'use VO defined ' 83 84 voms = self.__get_proxy_voname__() 85 86 # VO specific WMS options (no longer used by glite-wms-job-submit command) 87 # 1. vo specified in the configuration file 88 if self.config['ConfigVO']: 89 if self.middleware == 'EDG': 90 submit_option = '--config-vo %s' % self.config['ConfigVO'] 91 if not os.path.exists(self.config['ConfigVO']): 92 #raise Ganga.Utility.Config.ConfigError('') 93 raise ConfigError('') 94 else: 95 msg += 'in %s.' % self.config['ConfigVO'] 96 else: 97 logger.warning('ConfigVO configuration ignored by %s middleware. Set Config instead.' % self.middleware) 98 # 2. vo attached in the voms proxy 99 elif voms: 100 msg += 'in voms proxy: %s.' % voms 101 # 3. vo is given explicitely 102 elif self.config['VirtualOrganisation']: 103 submit_option = '--vo %s' % self.config['VirtualOrganisation'] 104 msg += 'in Ganga config: %s.' % self.config['VirtualOrganisation'] 105 # 4. no vo information is found 106 else: 107 logger.warning('No Virtual Organisation specified in the configuration. The plugin has been disabeled.') 108 return None 109 110 # general WMS options 111 # NB. please be aware the config for gLite WMS is NOT compatible with the config for EDG RB 112 # although both shares the same command option: '--config' 113 if self.config['Config']: 114 submit_option += ' --config %s' % self.config['Config'] 115 116 submit_option = ' %s ' % submit_option 117 118 logger.debug(msg) 119 120 return submit_option
121
122 - def __resolve_gridcmd_log_path__(self, regxp_logfname, cmd_output):
123 match_log = re.search(regxp_logfname,cmd_output) 124 125 logfile = None 126 if match_log: 127 logfile = match_log.group(1) 128 return logfile
129
130 - def __clean_gridcmd_log__(self, regxp_logfname, cmd_output):
131 132 logfile = self.__resolve_gridcmd_log_path__(regxp_logfname,cmd_output) 133 134 if logfile and os.path.exists(logfile): 135 os.remove(logfile) 136 137 return True
138
139 - def __print_gridcmd_log__(self,regxp_logfname,cmd_output):
140 141 logfile = self.__resolve_gridcmd_log_path__(regxp_logfname,cmd_output) 142 143 if logfile: 144 f = open(logfile,'r') 145 for l in f.readlines(): 146 logger.warning(l.strip()) 147 f.close() 148 149 ## here we assume the logfile is no longer needed at this point - remove it 150 os.remove(logfile) 151 else: 152 logger.warning('output\n%s\n',cmd_output) 153 logger.warning('end of output')
154
155 - def __get_proxy_voname__(self):
156 '''Check validity of proxy vo''' 157 158 # for EDG, we never check it 159 if self.middleware == 'EDG': 160 return None 161 else: 162 logger.debug('voms of credential: %s' % self.credential.voms) 163 return self.credential.voms
164
165 - def __get_lfc_host__(self):
166 '''Gets the LFC_HOST: from current shell or querying BDII on demand''' 167 lfc_host = None 168 169 if self.shell.env.has_key('LFC_HOST'): 170 lfc_host = self.shell.env['LFC_HOST'] 171 172 if not lfc_host: 173 lfc_host = self.__get_default_lfc__() 174 175 return lfc_host
176
177 - def __get_default_lfc__(self):
178 '''Gets the default lfc host from lcg-infosites''' 179 180 cmd = 'lcg-infosites' 181 182 logger.debug('%s lfc-infosites called ...' % self.middleware) 183 184 rc, output, m = self.shell.cmd1('%s --vo %s lfc' % (cmd,self.config['VirtualOrganisation']),allowed_exit=[0,255]) 185 186 if rc != 0: 187 #self.__print_gridcmd_log__('lcg-infosites',output) 188 return None 189 else: 190 lfc_list = output.strip().split('\n') 191 return lfc_list[0]
192
193 - def __resolve_no_matching_jobs__(self, cmd_output):
194 '''Parsing the glite-wms-job-status log to get the glite jobs which have been removed from the WMS''' 195 196 logfile = self.__resolve_gridcmd_log_path__('(.*-job-status.*\.log)',cmd_output) 197 198 glite_ids = [] 199 200 if logfile: 201 202 f = open(logfile,'r') 203 output = f.readlines() 204 f.close() 205 206 re_jid = re.compile('^Unable to retrieve the status for: (https:\/\/\S+:9000\/[0-9A-Za-z_\.\-]+)\s*$') 207 re_key = re.compile('^.*(no matching jobs found)\s*$') 208 209 m_jid = None 210 m_key = None 211 myjid = '' 212 for line in output: 213 m_jid = re_jid.match(line) 214 if m_jid: 215 myjid = m_jid.group(1) 216 m_jid = None 217 218 if myjid: 219 m_key = re_key.match(line) 220 if m_key: 221 glite_ids.append(myjid) 222 myjid = '' 223 224 return glite_ids
225
226 - def check_proxy(self):
227 '''Check the proxy and prompt the user to refresh it''' 228 229 if self.credential is None: 230 self.credential = getCredential('GridProxy',self.middleware) 231 232 if self.middleware == 'GLITE': 233 self.credential.voms = self.config['VirtualOrganisation']; 234 self.credential = getCredential('GridProxy', 'GLITE') 235 236 status = self.credential.renew(maxTry=3) 237 238 if not status: 239 logger.warning("Could not get a proxy, giving up after 3 retries") 240 return False 241 242 return True
243
244 - def list_match(self, jdlpath, ce=None):
245 '''Returns a list of computing elements can run the job''' 246 247 re_ce = re.compile('^\s*\-\s*(\S+\:2119\/\S+)\s*$') 248 249 matched_ces = [] 250 251 if self.middleware == 'EDG': 252 cmd = 'edg-job-list-match' 253 exec_bin = False 254 else: 255 cmd = 'glite-wms-job-list-match -a' 256 exec_bin = True 257 258 if not self.active: 259 logger.warning('LCG plugin not active.') 260 return 261 262 if not self.credential.isValid('01:00'): 263 logger.warning('GRID proxy lifetime shorter than 1 hour') 264 return 265 266 submit_opt = self.__set_submit_option__() 267 268 if not submit_opt: 269 return matched_ces 270 else: 271 cmd += submit_opt 272 273 cmd = '%s --noint %s' % (cmd, jdlpath) 274 275 logger.debug('job list match command: %s' % cmd) 276 277 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd), allowed_exit=[0,255]) 278 279 for l in output.split('\n'): 280 281 matches = re_ce.match(l) 282 283 if matches: 284 matched_ces.append(matches.group(1)) 285 286 if ce: 287 if matched_ces.count(ce) > 0: 288 matched_ces = [ ce ] 289 else: 290 matched_ces = [ ] 291 292 logger.debug('== matched CEs ==') 293 for myce in matched_ces: 294 logger.debug(myce) 295 logger.debug('== matched CEs ==') 296 297 return matched_ces
298
299 - def submit(self, jdlpath, ce=None):
300 '''Submit a JDL file to LCG''' 301 302 ## doing job submission 303 if self.middleware == 'EDG': 304 cmd = 'edg-job-submit' 305 exec_bin = False 306 else: 307 cmd = 'glite-wms-job-submit -a' 308 exec_bin = True 309 310 if not self.active: 311 logger.warning('LCG plugin not active.') 312 return 313 314 if not self.credential.isValid('01:00'): 315 logger.warning('GRID proxy lifetime shorter than 1 hour') 316 return 317 318 submit_opt = self.__set_submit_option__() 319 320 if not submit_opt: 321 return 322 else: 323 cmd += submit_opt 324 325 if ce: 326 cmd = cmd + ' -r %s' % ce 327 328 cmd = '%s --nomsg %s < /dev/null' % (cmd,jdlpath) 329 330 logger.debug('job submit command: %s' % cmd) 331 332 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd), 333 allowed_exit=[0,255], 334 timeout=self.config['SubmissionTimeout']) 335 336 if output: output = "%s" % output.strip() 337 338 #match = re.search('^(https:\S+)',output) 339 match = re.search('.*(https:\/\/\S+:9000\/[0-9A-Za-z_\.\-]+)',output) 340 341 if match: 342 logger.debug('job id: %s' % match.group(1)) 343 if self.middleware == 'GLITE' and self.perusable: 344 logger.info("Enabling perusal") 345 per_rc, per_out, per_m=self.shell.cmd1("glite-wms-job-perusal --set -f stdout %s" % match.group(1)) 346 347 ## remove the glite command log if it exists 348 self.__clean_gridcmd_log__('(.*-job-submit.*\.log)',output) 349 return match.group(1) 350 351 else: 352 logger.warning('Job submission failed.') 353 self.__print_gridcmd_log__('(.*-job-submit.*\.log)',output) 354 return
355
356 - def native_master_cancel(self,jobids):
357 '''Native bulk cancellation supported by GLITE middleware.''' 358 359 idsfile = tempfile.mktemp('.jids') 360 file(idsfile,'w').write('\n'.join(jobids)+'\n') 361 362 if self.middleware == 'EDG': 363 logger.warning('EDG middleware doesn\'t support bulk cancellation.') 364 return False 365 else: 366 cmd = 'glite-wms-job-cancel' 367 exec_bin = True 368 369 if not self.active: 370 logger.warning('LCG plugin not active.') 371 return False 372 373 if not self.credential.isValid('01:00'): 374 logger.warning('GRID proxy lifetime shorter than 1 hour') 375 return False 376 377 if not self.__set_submit_option__(): 378 return False 379 380 cmd = '%s --noint -i %s' % (cmd, idsfile) 381 382 logger.debug('job cancel command: %s' % cmd) 383 384 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255]) 385 386 if rc != 0: 387 logger.warning('Job cancellation failed.') 388 self.__print_gridcmd_log__('(.*-job-cancel.*\.log)',output) 389 return False 390 else: 391 # job cancellation succeeded, try to remove the glite command logfile if it exists 392 self.__clean_gridcmd_log__('(.*-job-cancel.*\.log)',output) 393 return True
394
395 - def status(self,jobids,is_collection=False):
396 '''Query the status of jobs on the grid''' 397 398 if not jobids: return ([],[]) 399 400 idsfile = tempfile.mktemp('.jids') 401 file(idsfile,'w').write('\n'.join(jobids)+'\n') 402 403 if self.middleware == 'EDG': 404 cmd = 'edg-job-status' 405 exec_bin = False 406 else: 407 cmd = 'glite-wms-job-status' 408 409 exec_bin = True 410 if self.config['IgnoreGliteScriptHeader']: 411 exec_bin = False 412 413 if is_collection: 414 cmd = '%s -v 3' % cmd 415 416 if not self.active: 417 logger.warning('LCG plugin not active.') 418 return ([],[]) 419 if not self.credential.isValid('01:00'): 420 logger.warning('GRID proxy lifetime shorter than 1 hour') 421 return ([],[]) 422 423 cmd = '%s --noint -i %s' % (cmd,idsfile) 424 logger.debug('job status command: %s' % cmd) 425 426 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd), 427 allowed_exit=[0,255], 428 timeout=self.config['StatusPollingTimeout']) 429 os.remove(idsfile) 430 431 missing_glite_jids = [] 432 if rc != 0: 433 missing_glite_jids = self.__resolve_no_matching_jobs__(output) 434 435 if missing_glite_jids: 436 logger.info('some jobs removed from WMS, will set corresponding Ganga job to \'failed\' status') 437 logger.debug('jobs removed from WMS: %s' % repr(missing_glite_jids)) 438 else: 439 self.__print_gridcmd_log__('(.*-job-status.*\.log)',output) 440 441 ## job status query succeeded, try to remove the glite command logfile if it exists 442 self.__clean_gridcmd_log__('(.*-job-status.*\.log)',output) 443 444 re_id = re.compile('^\s*Status info for the Job : (https://.*\S)\s*$') 445 re_status = re.compile('^\s*Current Status:\s+(.*\S)\s*$') 446 447 ## from glite UI version 1.5.14, the attribute 'Node Name:' is no longer available 448 ## for distinguishing master and node jobs. A new way has to be applied. 449 #re_name = re.compile('^\s*Node Name:\s+(.*\S)\s*$') 450 re_exit = re.compile('^\s*Exit code:\s+(.*\S)\s*$') 451 re_reason = re.compile('^\s*Status Reason:\s+(.*\S)\s*$') 452 re_dest = re.compile('^\s*Destination:\s+(.*\S)\s*$') 453 454 ## pattern to distinguish master and node jobs 455 re_master = re.compile('^BOOKKEEPING INFORMATION:\s*$') 456 re_node = re.compile('^- Nodes information.*\s*$') 457 458 ## pattern for node jobs 459 re_nodename = re.compile('^\s*NodeName\s*=\s*"(gsj_[0-9]+)";\s*$') 460 461 info = [] 462 is_master = False 463 is_node = False 464 #node_cnt = 0 465 for line in output.split('\n'): 466 467 match = re_master.match(line) 468 if match: 469 is_master = True 470 is_node = False 471 #node_cnt = 0 472 continue 473 474 match = re_node.match(line) 475 if match: 476 is_master = False 477 is_node = True 478 continue 479 480 match = re_id.match(line) 481 if match: 482 info += [{ 'id' : match.group(1), 483 'name' : '', 484 'is_node': False, 485 'status' : '', 486 'exit' : '', 487 'reason' : '', 488 'destination' : '' }] 489 if is_node: 490 info[-1]['is_node'] = True 491 #if is_node: 492 # info[-1]['name'] = 'node_%d' % node_cnt 493 # node_cnt = node_cnt + 1 494 continue 495 496 match = re_nodename.match(line) 497 if match and is_node: 498 info[-1]['name'] = match.group(1) 499 #logger.debug('id: %s, name: %s' % (info[-1]['id'],info[-1]['name'])) 500 continue 501 502 match = re_status.match(line) 503 if match: 504 info[-1]['status'] = match.group(1) 505 continue 506 507 match = re_exit.match(line) 508 if match: 509 info[-1]['exit'] = match.group(1) 510 continue 511 512 match = re_reason.match(line) 513 if match: 514 info[-1]['reason'] = match.group(1) 515 continue 516 517 match = re_dest.match(line) 518 if match: 519 info[-1]['destination'] = match.group(1) 520 continue 521 522 return (info, missing_glite_jids)
523
524 - def get_loginfo(self,jobids,directory,verbosity=1):
525 '''Fetch the logging info of the given job and save the output in the job's outputdir''' 526 527 idsfile = tempfile.mktemp('.jids') 528 file(idsfile,'w').write('\n'.join(jobids)+'\n') 529 530 if self.middleware == 'EDG': 531 cmd = 'edg-job-get-logging-info -v %d' % verbosity 532 exec_bin = False 533 else: 534 cmd = 'glite-wms-job-logging-info -v %d' % verbosity 535 536 exec_bin = True 537 if self.config['IgnoreGliteScriptHeader']: 538 exec_bin = False 539 540 if not self.active: 541 logger.warning('LCG plugin not active.') 542 return False 543 if not self.credential.isValid('01:00'): 544 logger.warning('GRID proxy lifetime shorter than 1 hour') 545 return False 546 547 log_output = directory+'/__jobloginfo__.log' 548 549 cmd = '%s --noint -o %s -i %s' % (cmd, log_output, idsfile) 550 551 logger.debug('job logging info command: %s' % cmd) 552 553 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255]) 554 os.remove(idsfile) 555 556 if rc != 0: 557 self.__print_gridcmd_log__('(.*-logging-info.*\.log)',output) 558 return False 559 else: 560 # logging-info checking succeeded, try to remove the glite command logfile if it exists 561 self.__clean_gridcmd_log__('(.*-logging-info.*\.log)',output) 562 # returns the path to the saved logging info if success 563 return log_output
564
565 - def get_output(self,jobid,directory,wms_proxy=False):
566 '''Retrieve the output of a job on the grid''' 567 568 if self.middleware == 'EDG': 569 cmd = 'edg-job-get-output' 570 exec_bin = False 571 else: 572 cmd = 'glite-wms-job-output' 573 exec_bin = True 574 # general WMS options (somehow used by the glite-wms-job-output command) 575 if self.config['Config']: 576 cmd += ' --config %s' % self.config['Config'] 577 578 if not self.active: 579 logger.warning('LCG plugin is not active.') 580 return (False,None) 581 if not self.credential.isValid('01:00'): 582 logger.warning('GRID proxy lifetime shorter than 1 hour') 583 return (False,None) 584 585 cmd = '%s --noint --dir %s %s' % (cmd,directory,jobid) 586 587 logger.debug('job get output command: %s' % cmd) 588 589 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255]) 590 591 match = re.search('directory:\n\s*(\S+)\s*\n',output) 592 593 if not match: 594 logger.warning('Job output fetch failed.') 595 self.__print_gridcmd_log__('(.*-output.*\.log)',output) 596 return (False, 'cannot fetch job output') 597 598 # job output fetching succeeded, try to remove the glite command logfile if it exists 599 self.__clean_gridcmd_log__('(.*-output.*\.log)',output) 600 601 outdir = match.group(1) 602 603 # some versions of LCG middleware create an extra output directory (named <uid>_<jid_hash>) 604 # inside the job.outputdir. Try to match the jid_hash in the outdir. Do output movememnt 605 # if the <jid_hash> is found in the path of outdir. 606 import urlparse 607 jid_hash = urlparse.urlparse(jobid)[2][1:] 608 609 if outdir.count(jid_hash): 610 if self.shell.system('mv %s/* %s' % (outdir,directory)) == 0: 611 try: 612 os.rmdir(outdir) 613 except Exception, msg: 614 logger.warning( "Error trying to remove the empty directory %s:\n%s" % ( outdir, msg ) ) 615 else: 616 logger.warning( "Error moving output from %s to %s.\nOutput is left in %s." % (outdir,directory,outdir) ) 617 else: 618 pass 619 620 return self.__get_app_exitcode__(directory)
621
622 - def cancelMultiple(self, jobids):
623 '''Cancel multiple jobs in one LCG job cancellation call''' 624 625 # compose a temporary file with job ids in it 626 if not jobids: return True 627 628 idsfile = tempfile.mktemp('.jids') 629 file(idsfile,'w').write('\n'.join(jobids)+'\n') 630 631 # do the cancellation using a proper LCG command 632 if self.middleware == 'EDG': 633 cmd = 'edg-job-cancel' 634 exec_bin = False 635 else: 636 cmd = 'glite-wms-job-cancel' 637 exec_bin = True 638 639 if not self.active: 640 logger.warning('LCG plugin is not active.') 641 return False 642 if not self.credential.isValid('01:00'): 643 logger.warning('GRID proxy lifetime shorter than 1 hour') 644 return False 645 646 # compose the cancel command 647 cmd = '%s --noint -i %s' % (cmd,idsfile) 648 649 logger.debug('job cancel command: %s' % cmd) 650 651 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255]) 652 653 if rc == 0: 654 # job cancelling succeeded, try to remove the glite command logfile if it exists 655 self.__clean_gridcmd_log__('(.*-job-cancel.*\.log)',output) 656 return True 657 else: 658 logger.warning( "Failed to cancel jobs.\n%s" % output ) 659 self.__print_gridcmd_log__('(.*-job-cancel.*\.log)',output) 660 return False
661
662 - def cancel(self,jobid):
663 '''Cancel a job''' 664 665 if self.middleware == 'EDG': 666 cmd = 'edg-job-cancel' 667 exec_bin = False 668 else: 669 cmd = 'glite-wms-job-cancel' 670 exec_bin = True 671 672 if not self.active: 673 logger.warning('LCG plugin is not active.') 674 return False 675 if not self.credential.isValid('01:00'): 676 logger.warning('GRID proxy lifetime shorter than 1 hour') 677 return False 678 679 cmd = '%s --noint %s' % (cmd,jobid) 680 681 logger.debug('job cancel command: %s' % cmd) 682 683 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255]) 684 685 if rc == 0: 686 # job cancelling succeeded, try to remove the glite command logfile if it exists 687 self.__clean_gridcmd_log__('(.*-job-cancel.*\.log)',output) 688 return True 689 else: 690 logger.warning( "Failed to cancel job %s.\n%s" % ( jobid, output ) ) 691 self.__print_gridcmd_log__('(.*-job-cancel.*\.log)',output) 692 return False
693
694 - def __cream_parse_job_status__(self, log):
695 '''Parsing job status report from CREAM CE status query''' 696 697 jobInfoDict = {} 698 699 re_jid = re.compile('^\s+JobID\=\[(https:\/\/.*[:0-9]?\/CREAM.*)\]$') 700 re_log = re.compile('^\s+(\S+.*\S+)\s+\=\s+\[(.*)\]$') 701 702 re_jts = re.compile('^\s+Job status changes:$') 703 re_ts = re.compile('^\s+Status\s+\=\s+\[(.*)\]\s+\-\s+\[(.*)\]\s+\(([0-9]+)\)$') 704 re_cmd = re.compile('^\s+Issued Commands:$') 705 706 ## in case of status retrival failed 707 re_jnf = re.compile('^.*job not found.*$') 708 709 jid = None 710 711 for jlog in log.split('******')[1:]: 712 713 for l in jlog.split('\n'): 714 l.strip() 715 716 m = re_jid.match(l) 717 718 if m: 719 jid = m.group(1) 720 jobInfoDict[jid] = {} 721 continue 722 723 if re_jnf.match(l): 724 break 725 726 m = re_log.match(l) 727 if m: 728 att = m.group(1) 729 val = m.group(2) 730 jobInfoDict[jid][att] = val 731 continue 732 733 if re_jts.match(l): 734 jobInfoDict[jid]['Timestamps'] = {} 735 continue 736 737 m = re_ts.match(l) 738 if m: 739 s = m.group(1) 740 t = int(m.group(3)) 741 jobInfoDict[jid]['Timestamps'][s] = t 742 continue 743 744 if re_cmd.match(l): 745 break 746 747 return jobInfoDict
748
749 - def __cream_ui_check__(self):
750 '''checking if CREAM CE environment is set properly''' 751 752 if self.middleware.upper() != 'GLITE': 753 logger.warning('CREAM CE job operation not supported') 754 return False 755 756 if not self.active: 757 logger.warning('LCG plugin not active.') 758 return False 759 760 if not self.credential.isValid('01:00'): 761 logger.warning('GRID proxy lifetime shorter than 1 hour') 762 return False 763 764 return True
765
766 - def cream_check_delegated_proxy(self, ce):
767 ''' 768 Checking if the delegation id to given CE is still valid. 769 ''' 770 771 id = None 772 773 try: 774 id, t_expire = self.proxy_id[ce] 775 776 ## TODO: implement the check of the validity of the delegation id 777 now = time.time() 778 779 ## check if the lifetime of the existing proxy still longer than 1800 seconds 780 if t_expire - now <= 1800: 781 logger.debug('existing proxy going to expire in 1800 seconds.') 782 id = None 783 else: 784 logger.debug('reusing valid proxy %s on %s' % (id,ce)) 785 786 except KeyError: 787 pass 788 789 return id
790
791 - def cream_proxy_delegation(self, ce):
792 '''CREAM CE proxy delegation''' 793 794 if not self.__cream_ui_check__(): 795 return 796 797 if not ce: 798 logger.warning('No CREAM CE endpoint specified') 799 return 800 801 mydelid = self.cream_check_delegated_proxy(ce) 802 803 if not mydelid: 804 805 logger.debug('making new proxy delegation to %s' % ce) 806 807 t_expire = 0 808 809 exec_bin = True 810 811 cmd = 'glite-ce-delegate-proxy' 812 813 cmd += ' -e %s' % ce.split('/cream')[0] 814 815 mydelid = '%s_%s' % (self.credential.identity(), get_uuid()) 816 817 cmd = '%s %s' % (cmd, mydelid) 818 819 logger.debug('proxy delegation command: %s' % cmd) 820 821 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd), 822 allowed_exit=[0,255], 823 timeout=self.config['SubmissionTimeout']) 824 if rc != 0: 825 ## failed to delegate proxy 826 logger.error('proxy delegation error: %s' % output) 827 mydelid = '' 828 else: 829 ## proxy delegated successfully 830 ## NB: expiration time is "current time" + "credential lifetime" 831 t_expire = time.time() + float( self.credential.timeleft(units="seconds", force_check=True) ) 832 833 logger.debug('new proxy at %s valid until %s' % ( ce, time.ctime(t_expire) ) ) 834 835 self.proxy_id[ce] = [mydelid, t_expire] 836 837 return mydelid
838
839 - def cream_submit(self, jdlpath, ce):
840 '''CREAM CE direct job submission''' 841 842 if not self.__cream_ui_check__(): 843 return 844 845 if not ce: 846 logger.warning('No CREAM CE endpoint specified') 847 return 848 849 cmd = 'glite-ce-job-submit' 850 exec_bin = True 851 852 mydelid = self.cream_proxy_delegation(ce) 853 854 if mydelid: 855 cmd = cmd + ' -D %s' % mydelid 856 else: 857 cmd = cmd + ' -a' 858 859 cmd = cmd + ' -r %s' % ce 860 861 cmd = '%s --nomsg %s < /dev/null' % (cmd,jdlpath) 862 863 logger.debug('job submit command: %s' % cmd) 864 865 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd), 866 allowed_exit=[0,255], 867 timeout=self.config['SubmissionTimeout']) 868 869 if output: output = "%s" % output.strip() 870 871 match = re.search('^(https:\/\/\S+:8443\/[0-9A-Za-z_\.\-]+)$',output) 872 873 if match: 874 logger.debug('job id: %s' % match.group(1)) 875 return match.group(1) 876 else: 877 logger.warning('Job submission failed.') 878 return
879
880 - def cream_status(self,jobids):
881 '''CREAM CE job status query''' 882 883 if not self.__cream_ui_check__(): 884 return ([],[]) 885 886 if not jobids: return ([],[]) 887 888 idsfile = tempfile.mktemp('.jids') 889 file(idsfile,'w').write('##CREAMJOBS##\n' + '\n'.join(jobids)+'\n') 890 891 cmd = 'glite-ce-job-status' 892 exec_bin = True 893 894 cmd = '%s -L 2 -n -i %s' % (cmd,idsfile) 895 logger.debug('job status command: %s' % cmd) 896 897 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd), 898 allowed_exit=[0,255], 899 timeout=self.config['StatusPollingTimeout']) 900 jobInfoDict = {} 901 if rc == 0 and output: 902 jobInfoDict = self.__cream_parse_job_status__(output) 903 904 return jobInfoDict
905
906 - def cream_purgeMultiple(self, jobids):
907 '''CREAM CE job purging''' 908 909 if not self.__cream_ui_check__(): 910 return False 911 912 idsfile = tempfile.mktemp('.jids') 913 file(idsfile,'w').write('##CREAMJOBS##\n' + '\n'.join(jobids)+'\n') 914 915 cmd = 'glite-ce-job-purge' 916 exec_bin = True 917 918 cmd = '%s -n -N -i %s' % (cmd,idsfile) 919 920 logger.debug('job purge command: %s' % cmd) 921 922 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255]) 923 924 logger.debug(output) 925 926 if rc == 0: 927 return True 928 else: 929 return False
930
931 - def cream_cancelMultiple(self, jobids):
932 '''CREAM CE job cancelling''' 933 934 if not self.__cream_ui_check__(): 935 return False 936 937 idsfile = tempfile.mktemp('.jids') 938 file(idsfile,'w').write('##CREAMJOBS##\n' + '\n'.join(jobids)+'\n') 939 940 cmd = 'glite-ce-job-cancel' 941 exec_bin = True 942 943 cmd = '%s -n -N -i %s' % (cmd,idsfile) 944 945 logger.debug('job cancel command: %s' % cmd) 946 947 rc, output, m = self.shell.cmd1('%s%s' % (self.__get_cmd_prefix_hack__(binary=exec_bin),cmd),allowed_exit=[0,255]) 948 949 logger.debug(output) 950 951 if rc == 0: 952 return True 953 else: 954 return False
955
956 - def cream_get_output(self, osbURIList, directory):
957 '''CREAM CE job output retrieval''' 958 959 if not self.__cream_ui_check__(): 960 return (False,None) 961 962 gfiles = [] 963 for uri in osbURIList: 964 gf = GridftpFileIndex() 965 gf.id = uri 966 gfiles.append(gf) 967 968 cache = GridftpSandboxCache() 969 cache.middleware = 'GLITE' 970 cache.vo = self.config['VirtualOrganisation'] 971 cache.uploaded_files = gfiles 972 973 return cache.download( files=map(lambda x:x.id, gfiles), dest_dir=directory )
974
975 - def __get_app_exitcode__(self, outputdir):
976 977 import Ganga.Core.Sandbox as Sandbox 978 979 Sandbox.getPackedOutputSandbox(outputdir, outputdir) 980 981 ## check the application exit code 982 app_exitcode = -1 983 runtime_log = os.path.join(outputdir, '__jobscript__.log') 984 pat = re.compile(r'.*exit code (\d+).') 985 986 if not os.path.exists(runtime_log): 987 logger.warning('job runtime log not found: %s' % runtime_log) 988 return (False, 'job runtime log not found: %s' % runtime_log) 989 990 f = open(runtime_log, 'r') 991 for line in f.readlines(): 992 mat = pat.match(line) 993 if mat: 994 app_exitcode = eval(mat.groups()[0]) 995 break 996 f.close() 997 998 ## returns False if the exit code of the real executable is not zero 999 ## the job status of GANGA will be changed to 'failed' if the return value is False 1000 if app_exitcode != 0: 1001 logger.debug('job\'s executable returns non-zero exit code: %d' % app_exitcode) 1002 return (False, app_exitcode) 1003 else: 1004 return (True, 0)
1005
1006 - def expandjdl(items):
1007 '''Expand jdl items''' 1008 1009 text = "" 1010 for key, value in items.iteritems(): 1011 1012 if key == 'Requirements': 1013 if value: text += 'Requirements = \n %s;\n' % ' &&\n '.join(value) 1014 1015 elif key in ['ShallowRetryCount','RetryCount','NodeNumber','ExpiryTime', 'PerusalTimeInterval']: 1016 try: 1017 value = int(value) 1018 if value<0: raise ValueError 1019 text+='%s = %d;\n' % (key,value) 1020 except ValueError: 1021 logger.warning('%s is not positive integer.' % key) 1022 1023 elif key == 'Environment': 1024 if value: text += 'Environment = {\n "%s"\n};\n' % '",\n "'.join(['%s=\'%s\'' % var for var in value.items()]) 1025 1026 elif type(value) == ListType: 1027 if value: text += '%s = {\n "%s"\n};\n' % (key,'",\n "'.join(value)) 1028 1029 elif key == 'Rank': 1030 text += 'Rank = ( %s );\n' % value 1031 1032 elif key == 'Nodes': 1033 text += 'Nodes = %s;\n' % value 1034 1035 elif key in ['PerusalFileEnable', 'AllowZippedISB']: 1036 text += '%s = %s;\n' % (key, value) 1037 1038 elif key in ['DataRequirements']: 1039 text += 'DataRequirements = { %s };\n' % value 1040 else: 1041 text += '%s = "%s";\n' % (key,value) 1042 1043 return text
1044 1045 expandjdl=staticmethod(expandjdl)
1046