1
2
3
4
5
6
7 from Ganga.GPIDev.Base import GangaObject
8 from Ganga.GPIDev.Schema import *
9 from Ganga.Lib.Mergers.Merger import runAutoMerge
10
11 import Ganga.Utility.logging
12 from Ganga.GPIDev.Adapters.IMerger import MergerError
13 logger = Ganga.Utility.logging.getLogger()
14
15 from Ganga.Utility.util import isStringLike
16 from Ganga.Utility.logging import log_user_exception
17
18 from Ganga.Core import GangaException
19 from Ganga.Core.GangaRepository import RegistryKeyError
20
21 from Ganga.GPIDev.Adapters.IApplication import PostprocessStatusUpdate
22
23
27
33 return "JobError: %s"%str(self.what)
34
35 import Ganga.Utility.guid
36
38 ''' Additional job information.
39 Partially implemented
40 '''
41 _schema = Schema(Version(0,1),{
42 'submit_counter' : SimpleItem(defvalue=0,protected=1,doc="job submission/resubmission counter"),
43 'monitor' : ComponentItem('monitor',defvalue=None,load_default=0,comparable=0, optional=1,doc="job monitor instance"),
44 'uuid' : SimpleItem(defvalue='',protected=1,comparable=0, doc='globally unique job identifier'),
45 'monitoring_links' : SimpleItem(defvalue=[],typelist=['tuple'],sequence=1,protected=1,copyable=0,doc="list of tuples of monitoring links")
46 })
47
48 _category = 'jobinfos'
49 _name = 'JobInfo'
50
53
54
57
58 from JobTime import JobTime
59
60 -class Job(GangaObject):
61 '''Job is an interface for submision, killing and querying the jobs :-).
62
63 Basic configuration:
64
65 The "application" attribute defines what should be
66 run. Applications may be generic arbitrary executable scripts or
67 complex, predefined objects.
68
69 The "backend" attribute defines where and how to run. Backend
70 object represents a resource or a batch system with various
71 configuration parameters.
72
73 Available applications, backends and other job components may be listed
74 using the plugins() function. See help on plugins() function.
75
76 The "status" attribute represents the state of Ganga job object.
77 It is automatically updated by the monitoring loop. Note that
78 typically at the backends the jobs have their own, more detailed
79 status. This information is typically available via
80 "job.backend.status" attribute.
81
82 Bookkeeping and persistency:
83
84 Job objects contain basic book-keeping information: "id", "status"
85 and "name". Job objects are automatically saved in a job
86 repository which may be a special directory on a local filesystem
87 or a remote database.
88
89 Input/output and file workspace:
90
91 There is an input/output directory called file workspace
92 associated with each job ("inputdir" and "outputdir" properties).
93 When a job is submitted, all input files are copied to the file
94 workspace to keep consistency of the input while the job is
95 running. Ganga then ships all files in the input workspace to the
96 backend systems in a sandbox.
97
98 The list of input files is defined by the application
99 (implicitly). Additional files may be explicitly specified in the
100 "inputsandbox" attribute.
101
102 Job splitting:
103
104 The "splitter" attributes defines how a large job may be divided into
105 smaller subjobs. The subjobs are automatically created when the main
106 (master) job is submitted. The "subjobs" attribute gives access to
107 individual subjobs. The "master" attribute of a subjob points back to the
108 master job.
109
110 Merging:
111
112 The "merger" attribute defines how the output of the subjobs may be merged.
113 Merging is not perfromed automatically and it is triggered by the merge() method.
114
115 Datasets: PENDING
116 Datasets are highly application and virtual organisation specific.
117 '''
118 _schema = Schema(Version(1,6),{ 'inputsandbox' : FileItem(defvalue=[],typelist=['str','Ganga.GPIDev.Lib.File.File.File'],sequence=1,doc="list of File objects shipped to the worker node "),
119 'outputsandbox' : SimpleItem(defvalue=[],typelist=['str'],sequence=1,doc="list of filenames or patterns shipped from the worker node"),
120 'info':ComponentItem('jobinfos',defvalue=None,doc='JobInfo '),
121 'time':ComponentItem('jobtime', defvalue=None,protected=1,comparable=0,doc='provides timestamps for status transitions'),
122 'application' : ComponentItem('applications',doc='specification of the application to be executed'),
123 'backend': ComponentItem('backends',doc='specification of the resources to be used (e.g. batch system)'),
124 'id' : SimpleItem('',protected=1,comparable=0,doc='unique Ganga job identifier generated automatically'),
125 'status': SimpleItem('new',protected=1,checkset='_checkset_status',doc='current state of the job, one of "new", "submitted", "running", "completed", "killed", "unknown", "incomplete"'),
126 'name':SimpleItem('',doc='optional label which may be any combination of ASCII characters',typelist=['str']),
127 'inputdir':SimpleItem(getter="getStringInputDir",defvalue=None,transient=1,protected=1,comparable=0,load_default=0,optional=1,copyable=0,typelist=['str'],doc='location of input directory (file workspace)'),
128
129 'outputdir':SimpleItem(getter="getStringOutputDir",defvalue=None,transient=1,protected=1,comparable=0,load_default=0,optional=1,copyable=0,typelist=['str'],doc='location of output directory (file workspace)'),
130
131 'inputdata':ComponentItem('datasets',defvalue=None,typelist=['Ganga.GPIDev.Lib.Dataset.Dataset'],load_default=0,optional=1,doc='dataset definition (typically this is specific either to an application, a site or the virtual organization'),
132 'outputdata':ComponentItem('datasets',defvalue=None,load_default=0,optional=1,doc='dataset definition (typically this is specific either to an application, a site or the virtual organization'),
133 'splitter':ComponentItem('splitters',defvalue=None,load_default=0,optional=1,doc='optional splitter'),
134 'subjobs':ComponentItem('jobs',defvalue=[],sequence=1,protected=1,load_default=0,copyable=0,optional=1,proxy_get="_subjobs_proxy",doc='list of subjobs (if splitting)',summary_print = '_subjobs_summary_print'),
135 'master':ComponentItem('jobs',getter="_getParent",transient=1,protected=1,load_default=0,defvalue=None,optional=1,copyable=0,comparable=0,doc='master job',visitable=0),
136 'merger':ComponentItem('mergers',defvalue=None,load_default=0,optional=1,doc='optional output merger'),
137 'do_auto_resubmit':SimpleItem(defvalue = False, doc='Automatically resubmit failed subjobs'),
138 'fqid':SimpleItem(getter="getStringFQID",transient=1,protected=1,load_default=0,defvalue=None,optional=1,copyable=0,comparable=0,typelist=['str'],doc='fully qualified job identifier',visitable=0)
139 })
140
141 _category = 'jobs'
142 _name = 'Job'
143 _exportmethods = ['submit','remove','kill', 'resubmit', 'peek','fail', 'force_status','merge' ]
144
145 default_registry = 'jobs'
146
147
148 _GUIPrefs = [ { 'attribute' : 'id' },
149 { 'attribute' : 'status' },
150 { 'attribute' : 'inputsandbox', 'displayLevel' : 1 },
151 { 'attribute' : 'inputdata' },
152 { 'attribute' : 'outputsandbox' } ]
153
154
155
159
161 return self.status != 'new'
162
163
164
166 try:
167 id = self.id
168 except KeyError:
169 id = None
170 try:
171 oldstat = self.status
172 except KeyError:
173 oldstat = None
174 logger.debug('job %s "%s" setting raw status to "%s"',str(id),str(oldstat),value)
175
176
177
178
179
180
181
183 - def __init__(self,state,transition_comment='',hook=None):
184 self.state = state
185 self.transition_comment = transition_comment
186 self.hook = hook
187
190 self.states = {}
191 for s in states:
192 assert(not s.state in self)
193 self[s.state] = s
194
195 status_graph = {'new' : Transitions(State('submitting','j.submit()',hook='monitorSubmitting_hook'),
196 State('removed','j.remove()')),
197 'submitting' : Transitions(State('new','submission failed',hook='rollbackToNewState'),
198 State('submitted',hook='monitorSubmitted_hook'),
199 State('unknown','forced remove OR remote jobmgr error'),
200 State('failed','manually forced or keep_on_failed=True',hook='monitorFailed_hook')),
201 'submitted' : Transitions(State('running'),
202 State('killed','j.kill()',hook='monitorKilled_hook'),
203 State('unknown','forced remove'),
204 State('failed', 'j.fail(force=1)',hook='monitorFailed_hook'),
205 State('completing','job output already in outputdir',hook='postprocess_hook'),
206 State('completed',hook='postprocess_hook'),
207 State('submitting','j.resubmit(force=1)')),
208 'running' : Transitions(State('completing'),
209 State('completed','job output already in outputdir',hook='postprocess_hook'),
210 State('failed','backend reported failure OR j.fail(force=1)',hook='monitorFailed_hook'),
211 State('killed','j.kill()',hook='monitorKilled_hook'),
212 State('unknown','forced remove'),
213 State('submitting','j.resubmit(force=1)'),
214 State('submitted','j.resubmit(force=1)')),
215 'completing' : Transitions(State('completed',hook='postprocess_hook'),
216 State('failed','postprocessing error OR j.fail(force=1)',hook='postprocess_hook_failed'),
217 State('unknown','forced remove'),
218 State('submitting','j.resubmit(force=1)'),
219 State('submitted','j.resubmit(force=1)')),
220 'killed' : Transitions(State('removed','j.remove()'),
221 State('failed','j.fail()'),
222 State('submitting','j.resubmit()'),
223 State('submitted','j.resubmit()')),
224 'failed' : Transitions(State('removed','j.remove()'),
225 State('submitting','j.resubmit()'),
226 State('submitted','j.resubmit()')),
227 'completed' : Transitions(State('removed','j.remove()'),
228 State('failed','j.fail()'),
229 State('submitting','j.resubmit()'),
230 State('submitted','j.resubmit()')),
231 'incomplete' : Transitions(State('removed','j.remove()')),
232 'unknown' : Transitions(State('removed','forced remove')),
233 'template': Transitions(State('removed'))
234 }
235
236 transient_states = ['incomplete','removed','unknown']
237 initial_states = ['new','incomplete','template']
238
239 - def updateStatus(self,newstatus, transition_update = True):
240 """ Move job to the new status. according to the state transition graph (Job.status_graph).
241 If transition is allowed:
242 - call the specific transition hook (if exists)
243 - call the general transition update hook (triggers the auto merger and application hooks)
244 - commit the job
245 If job cannot be commited, then revert to old status and raise JobStatusError.
246 If transition is not allowed then raise JobStatusError.
247
248 Transitions from to the current state are allowed by default (so you can updateStatus('running') if job is 'running').
249 Such default transitions do not have hooks.
250 """
251 fqid = self.getFQID('.')
252 logger.debug('attempt to change job %s status from "%s" to "%s"',fqid, self.status,newstatus)
253
254 try:
255 state = self.status_graph[self.status][newstatus]
256 except KeyError:
257
258 if newstatus == self.status:
259 state = Job.State(newstatus)
260 else:
261 raise JobStatusError('forbidden status transition of job %s from "%s" to "%s"'%(fqid, self.status,newstatus))
262
263 self._getWriteAccess()
264
265 saved_status = self.status
266
267 try:
268 if state.hook:
269 try:
270 getattr(self,state.hook)()
271 except PostprocessStatusUpdate,x:
272 newstatus = x.status
273
274 if transition_update:
275
276 newstatus = self.transition_update(newstatus)
277
278 if self.status != newstatus:
279 self.time.timenow(str(newstatus))
280 logger.debug("timenow('%s') called.", self.status)
281 else:
282 logger.debug("Status changed from '%s' to '%s'. No new timestamp was written", self.status, newstatus)
283
284 self.status = newstatus
285 self._commit()
286 except Exception,x:
287 self.status = saved_status
288 log_user_exception()
289 raise JobStatusError(x)
290
291 logger.info('job %s status changed to "%s"',fqid,self.status)
292
306
308 """
309 Update master job status based on the status of subjobs.
310 This is an auxiliary method for implementing bulk subjob monitoring.
311 """
312
313 j = self
314 stats = [s.status for s in j.subjobs]
315
316
317 if not stats:
318 logger.warning('ignoring master job status updated for job %s (NOT MASTER)',self.getFQID('.'))
319 return
320
321 new_stat = None
322
323 for s in ['submitting','submitted','running','failed','killed','completing','completed']:
324 if s in stats:
325 new_stat = s
326 break
327
328 if new_stat == j.status:
329 return
330
331 if not new_stat:
332 logger.critical('undefined state for job %d, stats=%s',j.id,str(stats))
333
334 j.updateStatus(new_stat)
335
339
341 """Let monitoring services work with the subjobconfig after it's prepared"""
342 self.getMonitoringService().prepare(subjobconfig=subjobconfig)
343
347
351
355
359
362
365
368
381
382
385
386
396
411
419
422
425
428
430 """Return a fully qualified job id (within registry): a list of ids [masterjob_id,subjob_id,...]
431 If optional sep is specified FQID string is returned, ids are separated by sep.
432 For example: getFQID('.') will return 'masterjob_id.subjob_id....'
433 """
434
435 fqid = [self.id]
436 cur = self._getParent()
437 while cur:
438 fqid.append(cur.id)
439 cur = cur._getParent()
440 fqid.reverse()
441
442 if sep:
443 return sep.join([str(id) for id in fqid])
444 return fqid
445
448
451
454
460
461 - def peek( self, filename = "", command = "" ):
462 '''
463 Allow viewing of job output (and input) files
464
465 Arguments other than self:
466 filename : name of file to be viewed
467 => For backends where this is enabled, the filename
468 for a job in the "running" state is relative to
469 the job's work directory unless the filename begins
470 with "../". In all other cases, the filename is
471 relative to the job's output directory
472 command : command to be used for viewing the file
473 => If no command is given, then the command defined in
474 the [File_Associations] section of the Ganga
475 configuration file(s) is used
476
477 Example usage:
478
479 # examine contents of output/work directory
480 peek()
481
482 # examine contents of output directory,
483 # even in case of job in "running" state
484 peek( "../output" )
485
486 # examine contents of input directory
487 peek( "../input" )
488
489 # View ROOT histograms, running root.exe in a new terminal window
490 peek( "histograms.root", "root.exe &&" )
491
492 # View contents of file in output/work directory, using
493 # command defined in configuration file
494 peek( "output.txt" )
495
496 # View ROOT histograms in ouput/work directory,
497 # running root.exe in a new terminal window
498 peek( "histograms.root", "root.exe &&" )
499
500 Return value: None
501 '''
502
503 import os
504 pathStart = filename.split( os.sep )[ 0 ]
505 if ( ( "running" == self.status ) and ( pathStart != ".." ) ):
506 self.backend.peek( filename = filename, command = command )
507 else:
508 topdir = os.path.dirname( self.inputdir.rstrip( os.sep ) )
509 path = os.path.join( topdir, "output", filename ).rstrip( os.sep )
510 self.viewFile( path = path, command = command )
511 return None
512
513 - def viewFile( self, path = "", command = "" ):
514 '''
515 Allow file viewing
516
517 Arguments other than self:
518 path : path to file to be viewed
519 command : command to be used for viewing the file
520 => If no command is given, then the command defined in
521 the [File_Associations] section of the Ganga
522 configuration file(s) is used
523
524 This is intended as a helper function for the peek() method.
525
526 Return value: None
527 '''
528
529 import os
530 from Ganga.Utility.Config import ConfigError, getConfig
531 from exceptions import IndexError
532 config = getConfig( "File_Associations" )
533 if os.path.exists( path ):
534 if os.path.islink( path ):
535 path = os.readlink( path )
536 if not command:
537 if os.path.isdir( path ):
538 command = config[ "listing_command" ]
539 else:
540 suffix = os.path.splitext( path )[ 1 ].lstrip( "." )
541 try:
542 command = config[ suffix ]
543 except ConfigError:
544 command = config[ "fallback_command" ]
545
546 mode = os.P_WAIT
547
548 try:
549 tmpList = command.split( "&&" )
550 termCommand = tmpList[ 1 ]
551 if not termCommand:
552 termCommand = config[ "newterm_command" ]
553 exeopt = config[ "newterm_exeopt" ]
554 exeCommand = " ".join( [ tmpList[ 0 ], path ] )
555 argList = [ termCommand, exeopt, exeCommand ]
556 mode = os.P_NOWAIT
557 except IndexError:
558 tmpList = command.split( "&" )
559 if ( len( tmpList ) > 1 ):
560 mode = os.P_NOWAIT
561 command = tmpList[ 0 ]
562 argList = command.split()
563 argList.append( path )
564
565 cmd = argList[ 0 ]
566 os.spawnvp( mode, cmd, argList )
567 else:
568 logger.warning( "File/directory '%s' not found" % path )
569
570 return None
571
572 - def submit(self,keep_going=None,keep_on_fail=None):
573 '''Submits a job. Return true on success.
574
575 First the application is configured which may generate
576 additional input files, preprocess executable scripts etc.
577 Then backend handler is used to submit the configured job.
578 The job is automatically checkpointed to persistent storage.
579
580 The default values of keep_going and keep_on_fail are controlled by [GPI_Semantics] configuration options.
581
582 When the submission fails the job status is automatically
583 reverted to new and all files in the input directory are
584 deleted (keep_on_fail=False is the default behaviour unless modified in configuration).
585 If keep_on_fail=True then the job status
586 is moved to the failed status and input directory is left intact.
587 This is helpful for debugging anf implements the request #43143.
588
589 For split jobs: consult https://twiki.cern.ch/twiki/bin/view/ArdaGrid/GangaSplitters#Subjob_submission
590 '''
591 from Ganga.Utility.Config import ConfigError, getConfig
592 gpiconfig = getConfig('GPI_Semantics')
593
594 if keep_going is None:
595 keep_going = gpiconfig['job_submit_keep_going']
596
597 if keep_on_fail is None:
598 keep_on_fail = gpiconfig['job_submit_keep_on_fail']
599
600 from Ganga.Core import ApplicationConfigurationError, JobManagerError, IncompleteJobSubmissionError, GangaException
601
602
603
604
605 import inspect
606 supports_keep_going = 'keep_going' in inspect.getargspec(self.backend.master_submit)[0]
607
608 if keep_going and not supports_keep_going:
609 msg = 'job.submit(keep_going=True) is not supported by %s backend'%self.backend._name
610 logger.error(msg)
611 raise JobError(msg)
612
613
614
615 if self.status != 'new':
616 msg = "cannot submit job %s which is in '%s' state"%(self.getFQID('.'),self.status)
617 logger.error(msg)
618 raise JobError(msg)
619
620 assert(self.subjobs == [])
621
622 if self.master is not None:
623 msg = "Cannot submit subjobs directly."
624 logger.error(msg)
625 raise JobError(msg)
626
627
628 from Ganga.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers
629 try:
630 rtHandler = allHandlers.get(self.application._name,self.backend._name)()
631 except KeyError,x:
632 msg = 'runtime handler not found for application=%s and backend=%s'%(self.application._name,self.backend._name)
633 logger.error(msg)
634 raise JobError(msg)
635
636 try:
637 logger.info("submitting job %d",self.id)
638
639 self.updateStatus('submitting')
640
641 try:
642
643 self._commit()
644 except Exception,x:
645 msg = 'cannot commit the job %s, submission aborted'%self.id
646 logger.error(msg)
647 self.status = 'new'
648 raise JobError(msg)
649
650 self.getDebugWorkspace().remove(preserve_top=True)
651
652 appmasterconfig = self.application.master_configure()[1]
653
654
655 if 1:
656 if self.splitter:
657 subjobs = self.splitter.validatedSplit(self)
658
659
660
661
662
663 i = 0
664
665
666
667 for j in subjobs:
668 j.info.uuid = Ganga.Utility.guid.uuid()
669 j.status='new'
670 j.time.timenow('new')
671 j.id = i
672 i += 1
673 self.subjobs.append(j)
674
675 for j in self.subjobs:
676 j._init_workspace()
677
678 rjobs = self.subjobs
679 self._commit()
680 else:
681 rjobs = [self]
682
683
684 appsubconfig = [ j.application.configure(appmasterconfig)[1] for j in rjobs ]
685 appconfig = (appmasterconfig,appsubconfig)
686
687
688 jobmasterconfig = rtHandler.master_prepare(self.application,appmasterconfig)
689
690
691 jobsubconfig = [ rtHandler.prepare(j.application,s,appmasterconfig,jobmasterconfig) for (j,s) in zip(rjobs,appsubconfig) ]
692
693
694 self.monitorPrepare_hook(jobsubconfig)
695
696
697 try:
698 if supports_keep_going:
699 r = self.backend.master_submit(rjobs,jobsubconfig,jobmasterconfig,keep_going)
700 else:
701 r = self.backend.master_submit(rjobs,jobsubconfig,jobmasterconfig)
702
703 if not r:
704 raise JobManagerError('error during submit')
705
706
707 if self.subjobs:
708 for jobs in self.subjobs:
709 jobs.info.increment()
710
711
712 except IncompleteJobSubmissionError,x:
713 logger.warning('Not all subjobs have been sucessfully submitted: %s',x)
714 self.info.increment()
715 self.updateStatus('submitted')
716 self._commit()
717
718
719 from Ganga.Runtime.spyware import ganga_job_submitted
720
721 if len(self.subjobs) == 0:
722 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "1", "0", "0")
723 else:
724 submitted_count = 0
725 for sj in self.subjobs:
726 if sj.status == 'submitted':
727 submitted_count+=1
728
729 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "0", "1", str(submitted_count))
730
731 return 1
732 except Exception,x:
733 if isinstance(x,GangaException):
734 log_user_exception(logger,debug = True)
735 logger.error(str(x))
736 else:
737 log_user_exception(logger,debug = False)
738
739 if keep_on_fail:
740 self.updateStatus('failed')
741 else:
742
743 logger.error('%s ... reverting job %s to the new status', str(x), self.getFQID('.') )
744 self.updateStatus('new')
745 raise JobError(x)
746
747
769
770
771 - def remove(self,force=False):
772 '''Remove the job.
773
774 If job has been submitted try to kill it first. Then remove
775 the file workspace associated with the job.
776
777 If force=True then remove job without killing it.
778 '''
779
780 template = self.status=='template'
781
782 if template:
783 logger.info('removing template %d',self.id)
784 else:
785 logger.info('removing job %d',self.id)
786
787 if self.status == 'removed':
788 msg = 'job %d already removed'%self.id
789 logger.error(msg)
790 raise JobError(msg)
791
792 if self.status == 'completing':
793 msg = 'job %s is completing (may be downloading output), do force_status("failed") and then remove() again'%self.getFQID('.')
794 logger.error(msg)
795 raise JobError(msg)
796
797 if self.master:
798 msg = 'cannot remove subjob %s'%self.getFQID('.')
799 logger.info(msg)
800 raise JobError(msg)
801
802 try:
803 self._getWriteAccess()
804 except RegistryKeyError:
805 if self._registry:
806 self._registry._remove(self,auto_removed=1)
807 return
808
809
810 if self.status in ['submitted','running']:
811 try:
812 if not force:
813 self._kill(transition_update=False)
814 except GangaException,x:
815 log_user_exception(logger,debug = True)
816 except Exception,x:
817 log_user_exception(logger)
818 logger.warning('unhandled exception in j.kill(), job id=%d',self.id)
819
820
821 if not self.status in ['incomplete','unknown']:
822
823
824 if hasattr(self.backend,'remove'):
825 self.backend.remove()
826
827
828 try:
829 self.application.transition_update("removed")
830 for sj in self.subjobs:
831 sj.application.transition_update("removed")
832 except AttributeError:
833
834 pass
835
836 if self._registry:
837 self._registry._remove(self,auto_removed=1)
838
839 self.status = 'removed'
840
841 if not template:
842
843
844
845 from Ganga.Core.FileWorkspace import InputWorkspace
846 wsp = InputWorkspace()
847 wsp.subpath=''
848 wsp.jobid = self.id
849
850 def doit(f):
851 try:
852 f()
853 except OSError,x:
854 logger.warning('cannot remove file workspace associated with the job %d : %s',self.id,str(x))
855
856 doit(wsp.remove)
857
858 self.getDebugWorkspace(create=False).remove(preserve_top=False)
859
860
861 - def fail(self,force=False):
862 """Deprecated. Use force_status('failed') instead."""
863 raise JobError('fail() method is deprecated, use force_status("failed") instead.')
864
865 allowed_force_states = { 'completed' : ['completing','failed'],
866 'failed' : ["submitting","completing","completed","submitted","running","killed"] }
867
869 ''' Force job to enter the "failed" or "completed" state. This may be
870 used for marking jobs "bad" jobs or jobs which are stuck in one of the
871 internal ganga states (e.g. completing).
872
873 To see the list of allowed states do: job.force_status(None)
874 '''
875
876 if status is None:
877 logger.info("The following states may be forced")
878 revstates = {}
879 for s1 in Job.allowed_force_states:
880 for s2 in Job.allowed_force_states[s1]:
881 revstates.setdefault(s2,{})
882 revstates[s2][s1] = 1
883
884 for s in revstates:
885 logger.info("%s => %s"%(s,revstates[s].keys()))
886 return
887
888 if self.status == status:
889 return
890
891 if not status in Job.allowed_force_states:
892 raise JobError('force_status("%s") not allowed. Job may be forced to %s states only.'%(status,Job.allowed_force_states.keys()))
893
894 if not self.status in Job.allowed_force_states[status]:
895 raise JobError('Only a job in one of %s may be forced into "failed" (job %s)'%(str(Job.allowed_force_states[status]),self.getFQID('.')))
896
897 if not force:
898 if self.status in ['submitted','running']:
899 try:
900 self._kill(transition_update=False)
901 except JobError,x:
902 x.what += "Use force_status('%s',force=True) to ignore kill errors."%status
903 raise x
904 try:
905 self.updateStatus(status)
906 logger.info('Job %s forced to status "%s"',self.getFQID('.'),status)
907 except JobStatusError,x:
908 logger.error(x)
909 raise x
910
915
916 - def _kill(self,transition_update):
958
960 """Resubmit a failed or completed job. A backend object may
961 be specified to change some submission parameters (which
962 parameters may be effectively changed depends on a
963 particular backend implementation).
964
965 Example:
966 b = j.backend.copy()
967 b.CE = 'some CE'
968 j.resubmit(backend=b)
969
970 Note: it is not possible to change the type of the backend in this way.
971
972 """
973 return self._resubmit(backend=backend)
974
976 """ Private method used for auto resubmit functionality by the monitoring loop.
977 """
978 return self._resubmit(auto_resubmit=True)
979
980
981 - def _resubmit(self,backend=None,auto_resubmit=False):
982 """ Internal implementation of resubmit which handles the publically accessible resubmit() method proper as well
983 as the auto_resubmit use case used in the monitoring loop.
984 """
985
986
987
988
989 from Ganga.Core import GangaException, IncompleteJobSubmissionError, JobManagerError
990
991 fqid = self.getFQID('.')
992 logger.info('resubmitting job %s',fqid)
993
994 if backend and auto_resubmit:
995 msg = "job %s: cannot change backend when auto_resubmit=True. This is most likely an internal implementation problem."%fqid
996 logger.error(msg)
997 raise JobError(msg)
998
999
1000 if not self.status in ['completed','failed','killed'] and not auto_resubmit:
1001 msg = "cannot resubmit job %s which is in '%s' state"%(fqid,self.status)
1002 logger.error(msg)
1003 raise JobError(msg)
1004
1005 backendProxy = backend
1006 backend = None
1007
1008 if backendProxy:
1009 backend = backendProxy._impl
1010
1011
1012 if backend and self.backend._name != backend._name:
1013 msg = "cannot resubmit job %s: change of the backend type is not allowed"%fqid
1014 logger.error(msg)
1015 raise JobError(msg)
1016
1017
1018
1019
1020 if backend == self.backend:
1021 backend = None
1022
1023
1024 import inspect
1025 supports_master_resubmit = len(inspect.getargspec(self.backend.master_resubmit)[0])>1
1026
1027 if not supports_master_resubmit and backend:
1028 raise JobError('%s backend does not support changing of backend parameters at resubmission (optional backend argument is not supported)'%self.backend._name)
1029
1030 def check_changability(obj1,obj2):
1031
1032 for name,item in obj1._schema.allItems():
1033 v1 = getattr(obj1,name)
1034 v2 = getattr(obj2,name)
1035 if not item['changable_at_resubmit'] and item['copyable']:
1036 if v1 != v2:
1037 raise JobError('%s.%s attribute cannot be changed at resubmit'%(obj1._name,name))
1038 if item.isA('ComponentItem'):
1039 check_changability(v1,v2)
1040
1041 if backend:
1042 check_changability(self.backend,backend)
1043
1044 oldstatus = self.status
1045
1046 self.updateStatus('submitting')
1047
1048 try:
1049 self._commit()
1050 except Exception,x:
1051 msg = 'cannot commit the job %s, resubmission aborted'%fqid
1052 logger.error(msg)
1053 self.status = oldstatus
1054 raise JobError(msg)
1055
1056 self.getDebugWorkspace().remove(preserve_top=True)
1057
1058 try:
1059 rjobs = self.subjobs
1060 if not rjobs:
1061 rjobs = [self]
1062 elif auto_resubmit:
1063 rjobs = [s for s in rjobs if s.status in ['failed'] ]
1064
1065 if rjobs:
1066 for sjs in rjobs:
1067 sjs.info.increment()
1068 sjs.getOutputWorkspace().remove(preserve_top=True)
1069
1070 try:
1071 if auto_resubmit:
1072 result = self.backend.master_auto_resubmit(rjobs)
1073 else:
1074 if backend is None:
1075 result = self.backend.master_resubmit(rjobs)
1076 else:
1077 result = self.backend.master_resubmit(rjobs,backend=backend)
1078
1079 if not result:
1080 raise JobManagerError('error during submit')
1081 except IncompleteJobSubmissionError,x:
1082 logger.warning('Not all subjobs of job %s have been sucessfully re-submitted: %s',fqid,x)
1083
1084
1085
1086 if self.subjobs:
1087 for sjs in self.subjobs:
1088 sjs.time.timenow('resubmitted')
1089 else:
1090 self.time.timenow('resubmitted')
1091
1092 self.status = 'submitted'
1093 self._commit()
1094
1095
1096 from Ganga.Runtime.spyware import ganga_job_submitted
1097
1098
1099 if fqid.find('.') > 0:
1100 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "0", "0", "1")
1101
1102 elif len(self.subjobs) == 0:
1103 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "1", "0", "0")
1104
1105 else:
1106 submitted_count = 0
1107 for sj in self.subjobs:
1108 if sj.status == 'submitted':
1109 submitted_count+=1
1110
1111 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "0", "0", str(submitted_count))
1112
1113
1114 except GangaException,x:
1115 logger.error("failed to resubmit job, %s" % (str(x),))
1116 logger.warning('reverting job %s to the %s status', fqid, oldstatus )
1117 self.status = oldstatus
1118 self._commit()
1119 raise
1120
1122 """ Helper method to unconditionally commit to the repository. The 'objects' list specifies objects
1123 to be commited (for example the subjobs). If objects are not specified then just the self is commited """
1124
1125 if objects is None:
1126 objects = [self]
1127
1128 objects = [self._getRoot()]
1129 reg = self._getRegistry()
1130 if not reg is None:
1131 reg._flush(objects)
1132
1133
1135
1136
1137
1138
1139
1140 return v
1141
1143 if self.id is None:
1144 id = "None"
1145 else:
1146 id = self.getFQID('.')
1147
1148
1149
1150
1151 return "%s#%s"%(str(self.__class__.__name__),id)
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161 - def merge(self, subjobs = None, sum_outputdir = None, **options):
1162 '''Merge the output of subjobs.
1163
1164 By default merge all subjobs into the master outputdir.
1165 The output location and the list of subjobs may be overriden.
1166 The options (keyword arguments) are passed onto the specific merger implementation.
1167 Refer to the specific merger documentation for more information about available options.
1168 '''
1169
1170 self._getWriteAccess()
1171
1172
1173 if (subjobs is not None and isStringLike(subjobs)) or (sum_outputdir is not None and not isStringLike(sum_outputdir)):
1174
1175 temp = subjobs
1176 subjobs = sum_outputdir
1177 sum_outputdir = temp
1178 logger.warning('Deprecated use of job.merge(sum_outputdir, subjobs), swap your arguments, will break in the future, it should be job.merge(subjobs, sum_outputdir)')
1179
1180 if sum_outputdir is None:
1181 sum_outputdir = self.outputdir
1182
1183 if subjobs is None:
1184 subjobs = self.subjobs
1185
1186 try:
1187 if self.merger:
1188 self.merger.merge(subjobs, sum_outputdir, **options)
1189 else:
1190 logger.warning('Cannot merge job %d: merger is not defined'%self.id)
1191 except Exception,x:
1192 log_user_exception()
1193 raise
1194
1202
1206
1207
1209 """A placeholder for Job configuration parameters.
1210
1211 JobTemplates are normal Job objects but they are never submitted. They have their own JobRegistry, so they do not get mixed up with
1212 normal jobs. They have always a "template" status.
1213
1214 Create a job with an existing job template t:
1215
1216 j = Job(t)
1217
1218 Save a job j as a template t:
1219
1220 t = JobTemplate(j)
1221
1222 You may save commonly used job parameters in a template and create new jobs easier and faster.
1223 """
1224 _category = 'jobs'
1225 _name = 'JobTemplate'
1226
1227 _schema = Job._schema.inherit_copy()
1228 _schema.datadict["status"] = SimpleItem('template',protected=1,checkset='_checkset_status',doc='current state of the job, one of "new", "submitted", "running", "completed", "killed", "unknown", "incomplete"')
1229
1230 default_registry = 'templates'
1231
1235
1238
1239
1240 - def remove(self,force=False):
1241 '''See Job for documentation. The force optional argument has no effect (it is provided for the compatibility with Job interface)'''
1242 return super(JobTemplate,self).remove()
1243
1245 """ Templates may not be submitted, return false."""
1246 return super(JobTemplate,self).submit()
1247
1249 """ Templates may not be killed, return false."""
1250 return super(JobTemplate,self).kill()
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596