1
2 import os
3 import os.path
4 import math
5 import re
6
7 from urlparse import urlparse
8
9 from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm
10 from Ganga.Core import GangaException
11
12 from Ganga.GPIDev.Schema import *
13 from Ganga.GPIDev.Lib.File import *
14 from Ganga.GPIDev.Adapters.IBackend import IBackend
15 from Ganga.Utility.Config import getConfig
16 from Ganga.Utility.logging import getLogger, log_user_exception
17 from Ganga.Lib.LCG.Utility import *
18 from Ganga.Lib.LCG.ElapsedTimeProfiler import ElapsedTimeProfiler
19
20 from Ganga.Lib.LCG.Grid import Grid
21 from Ganga.Lib.LCG.LCG import grids
22 from Ganga.Lib.LCG.GridftpSandboxCache import GridftpSandboxCache
23
25
26 osbURIList = []
27
28 re_osb = re.compile('^.*OutputSandbox\s+\=\s+\{(.*)\}\s?\]?$')
29
30 for l in jdl.split(';'):
31 m = re_osb.match( l )
32 if m:
33 osb = m.group(1)
34 osb = re.sub(r'\s?\"\s?', '', osb)
35
36 for f in osb.split(','):
37 if not urlparse(f)[0]:
38 osbURIList.append('%s/%s' % ( job.backend.osbURI, os.path.basename(f)) )
39 else:
40 osbURIList.append(f)
41 break
42
43 return osbURIList
44
46 '''CREAM backend - direct job submission to gLite CREAM CE'''
47 _schema = Schema(Version(1,0), {
48 'CE' : SimpleItem(defvalue='',doc='CREAM CE endpoint'),
49 'jobtype' : SimpleItem(defvalue='Normal',doc='Job type: Normal, MPICH'),
50 'requirements' : ComponentItem('LCGRequirements',doc='Requirements for the resource selection'),
51 'sandboxcache' : ComponentItem('GridSandboxCache',copyable=1,doc='Interface for handling oversized input sandbox'),
52 'id' : SimpleItem(defvalue='',typelist=['str','list'],protected=1,copyable=0,doc='Middleware job identifier'),
53 'status' : SimpleItem(defvalue='',typelist=['str','dict'], protected=1,copyable=0,doc='Middleware job status'),
54 'exitcode' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Application exit code'),
55 'exitcode_cream' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Middleware exit code'),
56 'actualCE' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The CREAM CE where the job actually runs.'),
57 'reason' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Reason of causing the job status'),
58 'workernode' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The worker node on which the job actually runs.'),
59 'isbURI' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The input sandbox URI on CREAM CE'),
60 'osbURI' : SimpleItem(defvalue='',protected=1,copyable=0,doc='The output sandbox URI on CREAM CE')
61 })
62
63 _category = 'backends'
64
65 _name = 'CREAM'
66
68 super(CREAM, self).__init__()
69
70
71 try:
72 reqName1 = config['Requirements']
73 reqName = config['Requirements'].split('.').pop()
74 reqModule = __import__(reqName1, globals(), locals(), [reqName1])
75 reqClass = vars(reqModule)[reqName]
76 self.requirements = reqClass()
77
78 logger.debug('load %s as LCGRequirements' % reqName)
79 except:
80 logger.debug('load default LCGRequirements')
81 pass
82
83
84
85 self.sandboxcache = GridftpSandboxCache()
86 try:
87 scName1 = config['SandboxCache']
88 scName = config['SandboxCache'].split('.').pop()
89 scModule = __import__(scName1, globals(), locals(), [scName1])
90 scClass = vars(scModule)[scName]
91 self.sandboxcache = scClass()
92 logger.debug('load %s as SandboxCache' % scName)
93 except:
94 logger.debug('load default SandboxCache')
95 pass
96
107
109 '''Sets up the sandbox cache object to adopt the runtime configuration of the LCG backend'''
110
111 re_token = re.compile('^token:(.*):(.*)$')
112
113 self.sandboxcache.vo = config['VirtualOrganisation']
114 self.sandboxcache.middleware = 'GLITE'
115 self.sandboxcache.timeout = config['SandboxTransferTimeout']
116
117 if self.sandboxcache._name == 'LCGSandboxCache':
118 if not self.sandboxcache.lfc_host:
119 self.sandboxcache.lfc_host = grids[self.middleware.upper()].__get_lfc_host__()
120
121 if not self.sandboxcache.se:
122
123 token = ''
124 se_host = config['DefaultSE']
125 m = re_token.match(se_host)
126 if m:
127 token = m.group(1)
128 se_host = m.group(2)
129
130 self.sandboxcache.se = se_host
131
132 if token:
133 self.sandboxcache.srm_token = token
134
135 if (self.sandboxcache.se_type in ['srmv2']) and (not self.sandboxcache.srm_token):
136 self.sandboxcache.srm_token = config['DefaultSRMToken']
137
138 elif self.sandboxcache._name == 'DQ2SandboxCache':
139
140
141 if not self.sandboxcache.dataset_name:
142 from GangaAtlas.Lib.ATLASDataset.DQ2Dataset import dq2outputdatasetname
143 self.sandboxcache.dataset_name,unused = dq2outputdatasetname("%s.input"%get_uuid(), 0, False, '')
144
145
146 for sj in job.subjobs:
147 sj.backend.sandboxcache.dataset_name = self.sandboxcache.dataset_name
148
149 elif self.sandboxcache._name == 'GridftpSandboxCache':
150 if config['CreamInputSandboxBaseURI']:
151 self.sandboxcache.baseURI = config['CreamInputSandboxBaseURI']
152 elif self.CE:
153 ce_host = re.sub(r'\:[0-9]+','',self.CE.split('/cream')[0])
154 self.sandboxcache.baseURI = 'gsiftp://%s/opt/glite/var/cream_sandbox/%s' % ( ce_host, self.sandboxcache.vo )
155 else:
156 logger.error('baseURI not available for GridftpSandboxCache')
157 return False
158
159 return True
160
240
242 '''preparing jobs in multiple threads'''
243
244 logger.warning('preparing %d subjobs ... it may take a while' % len(rjobs))
245
246
247 master_input_sandbox=IBackend.master_prepare(self,masterjobconfig)
248
249
250 for f in master_input_sandbox:
251 master_input_idx = self.__check_and_prestage_inputfile__(f)
252
253 if not master_input_idx:
254 logger.error('master input sandbox perparation failed: %s' % f)
255 return None
256
257
258 class MyAlgorithm(Algorithm):
259
260 def __init__(self):
261 Algorithm.__init__(self)
262
263 def process(self, sj_info):
264 my_sc = sj_info[0]
265 my_sj = sj_info[1]
266
267 try:
268 logger.debug("preparing job %s" % my_sj.getFQID('.'))
269 jdlpath = my_sj.backend.preparejob(my_sc, master_input_sandbox)
270
271 if (not jdlpath) or (not os.path.exists(jdlpath)):
272 raise GangaException('job %s not properly prepared' % my_sj.getFQID('.'))
273
274 self.__appendResult__( my_sj.id, jdlpath )
275 return True
276 except Exception,x:
277 log_user_exception()
278 return False
279
280 mt_data = []
281 for sc,sj in zip(subjobconfigs,rjobs):
282 mt_data.append( [sc, sj] )
283
284 myAlg = MyAlgorithm()
285 myData = Data(collection=mt_data)
286
287 runner = MTRunner(name='lcg_jprepare', algorithm=myAlg, data=myData, numThread=10)
288 runner.start()
289 runner.join(-1)
290
291 if len(runner.getDoneList()) < len(mt_data):
292 return None
293 else:
294
295 return runner.getResults()
296
298 '''submitting jobs in multiple threads'''
299
300 job = self.getJobObject()
301
302 logger.warning('submitting %d subjobs ... it may take a while' % len(node_jdls))
303
304
305 class MyAlgorithm(Algorithm):
306
307 def __init__(self, gridObj, masterInputWorkspace, ce):
308 Algorithm.__init__(self)
309 self.inpw = masterInputWorkspace
310 self.gridObj = gridObj
311 self.ce = ce
312
313 def process(self, jdl_info):
314 my_sj_id = jdl_info[0]
315 my_sj_jdl = jdl_info[1]
316
317 my_sj_jid = self.gridObj.cream_submit(my_sj_jdl, self.ce)
318
319 if not my_sj_jid:
320 return False
321 else:
322 self.__appendResult__( my_sj_id, my_sj_jid )
323 return True
324
325 mt_data = []
326 for id, jdl in node_jdls.items():
327 mt_data.append( (id, jdl) )
328
329 myAlg = MyAlgorithm(gridObj=grids['GLITE'],masterInputWorkspace=job.getInputWorkspace(), ce=self.CE)
330 myData = Data(collection=mt_data)
331
332 runner = MTRunner(name='cream_jsubmit', algorithm=myAlg, data=myData, numThread=config['SubmissionThread'])
333 runner.start()
334 runner.join(timeout=-1)
335
336 if len(runner.getDoneList()) < len(mt_data):
337
338 logger.error('some bulk jobs not successfully (re)submitted, canceling submitted jobs on WMS')
339 grids['GLITE'].cancelMultiple( runner.getResults().values() )
340 return None
341 else:
342 return runner.getResults()
343
345 '''Create job wrapper'''
346
347 script = """#!/usr/bin/env python
348 #-----------------------------------------------------
349 # This job wrapper script is automatically created by
350 # GANGA LCG backend handler.
351 #
352 # It controls:
353 # 1. unpack input sandbox
354 # 2. invoke application executable
355 # 3. invoke monitoring client
356 #-----------------------------------------------------
357 import os,os.path,shutil,tempfile
358 import sys,popen2,time,traceback
359
360 #bugfix #36178: subprocess.py crashes if python 2.5 is used
361 #try to import subprocess from local python installation before an
362 #import from PYTHON_DIR is attempted some time later
363 try:
364 import subprocess
365 except ImportError:
366 pass
367
368 ## Utility functions ##
369 def timeString():
370 return time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time()))
371
372 def printInfo(s):
373 out.write(timeString() + ' [Info]' + ' ' + str(s) + os.linesep)
374 out.flush()
375
376 def printError(s):
377 out.write(timeString() + ' [Error]' + ' ' + str(s) + os.linesep)
378 out.flush()
379
380 def lcg_file_download(vo,guid,localFilePath,timeout=60,maxRetry=3):
381 cmd = 'lcg-cp -t %d --vo %s %s file://%s' % (timeout,vo,guid,localFilePath)
382
383 printInfo('LFC_HOST set to %s' % os.environ['LFC_HOST'])
384 printInfo('lcg-cp timeout: %d' % timeout)
385
386 i = 0
387 rc = 0
388 isDone = False
389 try_again = True
390
391 while try_again:
392 i = i + 1
393 try:
394 ps = os.popen(cmd)
395 status = ps.close()
396
397 if not status:
398 isDone = True
399 printInfo('File %s download from iocache' % os.path.basename(localFilePath))
400 else:
401 raise IOError("Download file %s from iocache failed with error code: %d, trial %d." % (os.path.basename(localFilePath), status, i))
402
403 except IOError, e:
404 isDone = False
405 printError(str(e))
406
407 if isDone:
408 try_again = False
409 elif i == maxRetry:
410 try_again = False
411 else:
412 try_again = True
413
414 return isDone
415
416 ## system command executor with subprocess
417 def execSyscmdSubprocess(cmd, wdir=os.getcwd()):
418
419 import os, subprocess
420
421 global exitcode
422
423 outfile = file('stdout','w')
424 errorfile = file('stderr','w')
425
426 try:
427 child = subprocess.Popen(cmd, cwd=wdir, shell=True, stdout=outfile, stderr=errorfile)
428
429 while 1:
430 exitcode = child.poll()
431 if exitcode is not None:
432 break
433 else:
434 outfile.flush()
435 errorfile.flush()
436 monitor.progress()
437 time.sleep(0.3)
438 finally:
439 monitor.progress()
440
441 outfile.flush()
442 errorfile.flush()
443 outfile.close()
444 errorfile.close()
445
446 return True
447
448 ## system command executor with multi-thread
449 ## stderr/stdout handler
450 def execSyscmdEnhanced(cmd, wdir=os.getcwd()):
451
452 import os, threading
453
454 cwd = os.getcwd()
455
456 isDone = False
457
458 try:
459 ## change to the working directory
460 os.chdir(wdir)
461
462 child = popen2.Popen3(cmd,1)
463 child.tochild.close() # don't need stdin
464
465 class PipeThread(threading.Thread):
466
467 def __init__(self,infile,outfile,stopcb):
468 self.outfile = outfile
469 self.infile = infile
470 self.stopcb = stopcb
471 self.finished = 0
472 threading.Thread.__init__(self)
473
474 def run(self):
475 stop = False
476 while not stop:
477 buf = self.infile.read(10000)
478 self.outfile.write(buf)
479 self.outfile.flush()
480 time.sleep(0.01)
481 stop = self.stopcb()
482 #FIXME: should we do here?: self.infile.read()
483 #FIXME: this is to make sure that all the output is read (if more than buffer size of output was produced)
484 self.finished = 1
485
486 def stopcb(poll=False):
487 global exitcode
488 if poll:
489 exitcode = child.poll()
490 return exitcode != -1
491
492 out_thread = PipeThread(child.fromchild, sys.stdout, stopcb)
493 err_thread = PipeThread(child.childerr, sys.stderr, stopcb)
494
495 out_thread.start()
496 err_thread.start()
497 while not out_thread.finished and not err_thread.finished:
498 stopcb(True)
499 monitor.progress()
500 time.sleep(0.3)
501 monitor.progress()
502
503 sys.stdout.flush()
504 sys.stderr.flush()
505
506 isDone = True
507
508 except(Exception,e):
509 isDone = False
510
511 ## return to the original directory
512 os.chdir(cwd)
513
514 return isDone
515
516 ############################################################################################
517
518 ###INLINEMODULES###
519
520 ############################################################################################
521
522 ## Main program ##
523
524 outputsandbox = ###OUTPUTSANDBOX###
525 input_sandbox = ###INPUTSANDBOX###
526 wrapperlog = ###WRAPPERLOG###
527 appexec = ###APPLICATIONEXEC###
528 appargs = ###APPLICATIONARGS###
529 appenvs = ###APPLICATIONENVS###
530 timeout = ###TRANSFERTIMEOUT###
531
532 exitcode=-1
533
534 import sys, stat, os, os.path, commands
535
536 # Change to scratch directory if provided
537 scratchdir = ''
538 tmpdir = ''
539
540 orig_wdir = os.getcwd()
541
542 # prepare log file for job wrapper
543 out = open(os.path.join(orig_wdir, wrapperlog),'w')
544
545 if os.getenv('EDG_WL_SCRATCH'):
546 scratchdir = os.getenv('EDG_WL_SCRATCH')
547 elif os.getenv('TMPDIR'):
548 scratchdir = os.getenv('TMPDIR')
549
550 if scratchdir:
551 (status, tmpdir) = commands.getstatusoutput('mktemp -d %s/gangajob_XXXXXXXX' % (scratchdir))
552 if status == 0:
553 os.chdir(tmpdir)
554 else:
555 ## if status != 0, tmpdir should contains error message so print it to stderr
556 printError('Error making ganga job scratch dir: %s' % tmpdir)
557 printInfo('Unable to create ganga job scratch dir in %s. Run directly in: %s' % ( scratchdir, os.getcwd() ) )
558
559 ## reset scratchdir and tmpdir to disable the usage of Ganga scratch dir
560 scratchdir = ''
561 tmpdir = ''
562
563 wdir = os.getcwd()
564
565 if scratchdir:
566 printInfo('Changed working directory to scratch directory %s' % tmpdir)
567 try:
568 os.system("ln -s %s %s" % (os.path.join(orig_wdir, 'stdout'), os.path.join(wdir, 'stdout')))
569 os.system("ln -s %s %s" % (os.path.join(orig_wdir, 'stderr'), os.path.join(wdir, 'stderr')))
570 except Exception,e:
571 printError(sys.exc_info()[0])
572 printError(sys.exc_info()[1])
573 str_traceback = traceback.format_tb(sys.exc_info()[2])
574 for str_tb in str_traceback:
575 printError(str_tb)
576 printInfo('Linking stdout & stderr to original directory failed. Looking at stdout during job run may not be possible')
577
578 os.environ['PATH'] = '.:'+os.environ['PATH']
579
580 vo = os.environ['GANGA_LCG_VO']
581
582 try:
583 printInfo('Job Wrapper start.')
584
585 # download inputsandbox from remote cache
586 for f,guid in input_sandbox['remote'].iteritems():
587 if not lcg_file_download(vo, guid, os.path.join(wdir,f), timeout=int(timeout)):
588 raise Exception('Download remote input %s:%s failed.' % (guid,f) )
589 else:
590 getPackedInputSandbox(f)
591
592 printInfo('Download inputsandbox from iocache passed.')
593
594 # unpack inputsandbox from wdir
595 for f in input_sandbox['local']:
596 getPackedInputSandbox(os.path.join(orig_wdir,f))
597
598 printInfo('Unpack inputsandbox passed.')
599
600 printInfo('Loading Python modules ...')
601
602 sys.path.insert(0,os.path.join(wdir,PYTHON_DIR))
603
604 # check the python library path
605 try:
606 printInfo(' ** PYTHON_DIR: %s' % os.environ['PYTHON_DIR'])
607 except KeyError:
608 pass
609
610 try:
611 printInfo(' ** PYTHONPATH: %s' % os.environ['PYTHONPATH'])
612 except KeyError:
613 pass
614
615 for lib_path in sys.path:
616 printInfo(' ** sys.path: %s' % lib_path)
617
618 ###MONITORING_SERVICE###
619 monitor = createMonitoringObject()
620 monitor.start()
621
622 # execute application
623
624 ## convern appenvs into environment setup script to be 'sourced' before executing the user executable
625
626 printInfo('Prepare environment variables for application executable')
627
628 env_setup_script = os.path.join(os.getcwd(), '__ganga_lcg_env__.sh')
629
630 f = open( env_setup_script, 'w')
631 f.write('#!/bin/sh' + os.linesep )
632 f.write('##user application environmet setup script generated by Ganga job wrapper' + os.linesep)
633 for k,v in appenvs.items():
634
635 str_env = 'export %s="%s"' % (k, v)
636
637 printInfo(' ** ' + str_env)
638
639 f.write(str_env + os.linesep)
640 f.close()
641
642 try: #try to make shipped executable executable
643 os.chmod('%s/%s'% (wdir,appexec),stat.S_IXUSR|stat.S_IRUSR|stat.S_IWUSR)
644 except:
645 pass
646
647 status = False
648 try:
649 # use subprocess to run the user's application if the module is available on the worker node
650 import subprocess
651 printInfo('Load application executable with subprocess module')
652 status = execSyscmdSubprocess('source %s; %s %s' % (env_setup_script, appexec, appargs), wdir)
653 except ImportError,err:
654 # otherwise, use separate threads to control process IO pipes
655 printInfo('Load application executable with separate threads')
656 status = execSyscmdEnhanced('source %s; %s %s' % (env_setup_script, appexec, appargs), wdir)
657
658 os.system("cp %s/stdout stdout.1" % orig_wdir)
659 os.system("cp %s/stderr stderr.1" % orig_wdir)
660
661 printInfo('GZipping stdout and stderr...')
662
663 os.system("gzip stdout.1 stderr.1")
664
665 # move them to the original wdir so they can be picked up
666 os.system("mv stdout.1.gz %s/stdout.gz" % orig_wdir)
667 os.system("mv stderr.1.gz %s/stderr.gz" % orig_wdir)
668
669 if not status:
670 raise Exception('Application execution failed.')
671 printInfo('Application execution passed with exit code %d.' % exitcode)
672
673 createPackedOutputSandbox(outputsandbox,None,orig_wdir)
674
675 # pack outputsandbox
676 # printInfo('== check output ==')
677 # for line in os.popen('pwd; ls -l').readlines():
678 # printInfo(line)
679
680 printInfo('Pack outputsandbox passed.')
681 monitor.stop(exitcode)
682
683 # Clean up after us - All log files and packed outputsandbox should be in "wdir"
684 if scratchdir:
685 os.chdir(orig_wdir)
686 os.system("rm %s -rf" % wdir)
687 except Exception,e:
688 printError(sys.exc_info()[0])
689 printError(sys.exc_info()[1])
690 str_traceback = traceback.format_tb(sys.exc_info()[2])
691 for str_tb in str_traceback:
692 printError(str_tb)
693
694 printInfo('Job Wrapper stop.')
695
696 out.close()
697
698 # always return exit code 0 so the in the case of application failure
699 # one can always get stdout and stderr back to the UI for debug.
700 sys.exit(0)
701 """
702 return script
703
704 - def preparejob(self,jobconfig,master_job_sandbox):
705 '''Prepare the JDL'''
706
707 script = self.__jobWrapperTemplate__()
708
709 job = self.getJobObject()
710 inpw = job.getInputWorkspace()
711
712 wrapperlog = '__jobscript__.log'
713
714 import Ganga.Core.Sandbox as Sandbox
715
716 script = script.replace('###OUTPUTSANDBOX###',repr(jobconfig.outputbox))
717
718 script = script.replace('###APPLICATION_NAME###',job.application._name)
719 script = script.replace('###APPLICATIONEXEC###',repr(jobconfig.getExeString()))
720 script = script.replace('###APPLICATIONARGS###',repr(jobconfig.getArguments()))
721
722 if jobconfig.env:
723 script = script.replace('###APPLICATIONENVS###',repr(jobconfig.env))
724 else:
725 script = script.replace('###APPLICATIONENVS###',repr({}))
726
727 script = script.replace('###WRAPPERLOG###',repr(wrapperlog))
728 import inspect
729 script = script.replace('###INLINEMODULES###',inspect.getsource(Sandbox.WNSandbox))
730
731 mon = job.getMonitoringService()
732
733 self.monInfo = None
734
735
736 if type(self.monInfo) is type({}):
737 self.monInfo['remotefile'] = 'stdout'
738
739
740 try:
741 logger.debug('job info of monitoring service: %s' % str(self.monInfo))
742 except:
743 pass
744
745 script = script.replace('###MONITORING_SERVICE###',mon.getWrapperScriptConstructorText())
746
747
748 packed_files = jobconfig.getSandboxFiles() + Sandbox.getGangaModulesAsSandboxFiles(Sandbox.getDefaultModules()) + Sandbox.getGangaModulesAsSandboxFiles(mon.getSandboxModules())
749 sandbox_files = job.createPackedInputSandbox(packed_files)
750
751
752 sandbox_files.extend(master_job_sandbox)
753
754
755 lfc_host = ''
756
757 input_sandbox_uris = []
758 input_sandbox_names = []
759
760 ick = True
761
762 max_prestaged_fsize = 0
763 for f in sandbox_files:
764
765 idx = self.__check_and_prestage_inputfile__(f)
766
767 if not idx:
768 logger.error('input sandbox preparation failed: %s' % f)
769 ick = False
770 break
771 else:
772
773 if idx['lfc_host']:
774 lfc_host = idx['lfc_host']
775
776 if idx['remote']:
777 abspath = os.path.abspath(f)
778 fsize = os.path.getsize(abspath)
779
780 if fsize > max_prestaged_fsize:
781 max_prestaged_fsize = fsize
782
783 input_sandbox_uris.append( idx['remote'][ os.path.basename(f) ] )
784
785 input_sandbox_names.append( os.path.basename( urlparse(f)[2] ) )
786
787 if idx['local']:
788 input_sandbox_uris += idx['local']
789 input_sandbox_names.append( os.path.basename(f) )
790
791 if not ick:
792 logger.error('stop job submission')
793 return None
794
795
796
797 max_prestaged_fsize = 0
798 lfc_host = ''
799 transfer_timeout = config['SandboxTransferTimeout']
800 predict_timeout = int( math.ceil( max_prestaged_fsize/1000000.0 ) )
801
802 if predict_timeout > transfer_timeout:
803 transfer_timeout = predict_timeout
804
805 if transfer_timeout < 60:
806 transfer_timeout = 60
807
808 script = script.replace('###TRANSFERTIMEOUT###', '%d' % transfer_timeout)
809
810
811 script = script.replace('###INPUTSANDBOX###',repr({'remote':{}, 'local': input_sandbox_names }))
812
813
814 scriptPath = inpw.writefile(FileBuffer('__jobscript_%s__' % job.getFQID('.'),script),executable=1)
815 input_sandbox = input_sandbox_uris + [scriptPath]
816
817 for isb in input_sandbox:
818 logger.debug('ISB URI: %s' % isb)
819
820
821
822
823
824 output_sandbox = [wrapperlog]
825
826 if config['JobLogHandler'] in ['WMS']:
827 output_sandbox += ['stdout.gz','stderr.gz']
828
829 if len(jobconfig.outputbox):
830 output_sandbox += [Sandbox.OUTPUT_TARBALL_NAME]
831
832
833 jdl = {
834 'VirtualOrganisation' : config['VirtualOrganisation'],
835 'Executable' : os.path.basename(scriptPath),
836 'Environment': {'GANGA_LCG_VO': config['VirtualOrganisation'], 'GANGA_LOG_HANDLER': config['JobLogHandler'], 'LFC_HOST': lfc_host},
837 'StdOutput' : 'stdout',
838 'StdError' : 'stderr',
839 'InputSandbox' : input_sandbox,
840 'OutputSandbox' : output_sandbox,
841 'OutputSandboxBaseDestURI': 'gsiftp://localhost'
842 }
843
844 jdl['Environment'].update({'GANGA_LCG_CE': self.CE})
845 jdl['Requirements'] = self.requirements.merge(jobconfig.requirements).convert()
846
847 if self.jobtype.upper() in ['NORMAL','MPICH']:
848 jdl['JobType'] = self.jobtype.upper()
849 if self.jobtype.upper() == 'MPICH':
850
851 jdl['Requirements'].append('Member("MPICH",other.GlueHostApplicationSoftwareRunTimeEnvironment)')
852 jdl['NodeNumber'] = self.requirements.nodenumber
853 else:
854 logger.warning('JobType "%s" not supported' % self.jobtype)
855 return
856
857
858
859
860
861 jdlText = Grid.expandjdl(jdl)
862 logger.debug('subjob JDL: %s' % jdlText)
863 return inpw.writefile(FileBuffer('__jdlfile__',jdlText))
864
876
888
914
949
997
999 '''Submit the master job to the grid'''
1000
1001 profiler = ElapsedTimeProfiler(getLogger(name='Profile.LCG'))
1002 profiler.start()
1003
1004 job = self.getJobObject()
1005
1006
1007 allowed_celist = []
1008 try:
1009 allowed_celist = self.requirements.getce()
1010 if not self.CE and allowed_celist:
1011 self.CE = allowed_celist[0]
1012 except:
1013 logger.warning('CREAM CE assigment from AtlasCREAMRequirements failed.')
1014
1015 if self.CE and allowed_celist:
1016 if self.CE not in allowed_celist:
1017 logger.warning('submission to CE not allowed: %s, use %s instead' % ( self.CE, allowed_celist[0] ) )
1018 self.CE = allowed_celist[0]
1019
1020 if not self.CE:
1021 raise GangaException('CREAM CE endpoint not set')
1022
1023
1024 if not grids['GLITE'].cream_proxy_delegation(self.CE):
1025 logger.warning('proxy delegation to %s failed' % self.CE)
1026
1027
1028 if len(job.subjobs) == 0:
1029 ick = IBackend.master_submit(self,rjobs,subjobconfigs,masterjobconfig)
1030 else:
1031 ick = self.master_bulk_submit(rjobs,subjobconfigs,masterjobconfig)
1032
1033 profiler.check('==> master_submit() elapsed time')
1034
1035 return ick
1036
1037 - def submit(self,subjobconfig,master_job_sandbox):
1038 '''Submit the job to the grid'''
1039
1040 ick = False
1041
1042 jdlpath = self.preparejob(subjobconfig,master_job_sandbox)
1043
1044 if jdlpath:
1045 self.id = grids['GLITE'].cream_submit(jdlpath,self.CE)
1046
1047 if self.id:
1048 self.actualCE = self.CE
1049 ick = True
1050
1051 return ick
1052
1054 """
1055 Resubmit each subjob individually as bulk resubmission will overwrite
1056 previous master job statuses
1057 """
1058
1059
1060 mj = self._getParent()
1061 if mj.status == 'failed':
1062 return self.master_resubmit(rjobs)
1063
1064 for j in rjobs:
1065 if not j.backend.master_resubmit([j]):
1066 return False
1067
1068 return True
1069
1106
1126
1196
1197 updateMonitoringInformation = staticmethod(updateMonitoringInformation)
1198
1228
1229 logger = getLogger()
1230
1231 config = getConfig('LCG')
1232
1233
1234 config.addOption('CreamInputSandboxBaseURI', '', 'sets the baseURI for getting the input sandboxes for the job')
1235 config.addOption('CreamOutputSandboxBaseURI', '', 'sets the baseURI for putting the output sandboxes for the job')
1236
1237
1238