Package Ganga :: Package GPIDev :: Package Lib :: Package Job :: Module Job'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.GPIDev.Lib.Job.Job'

   1  ################################################################################ 
   2  # Ganga Project. http://cern.ch/ganga 
   3  # 
   4  # $Id: Job.py,v 1.13 2009-07-14 12:43:41 moscicki Exp $ 
   5  ################################################################################ 
   6   
   7  from Ganga.GPIDev.Base import GangaObject 
   8  from Ganga.GPIDev.Schema import * 
   9  from Ganga.Lib.Mergers.Merger import runAutoMerge 
  10   
  11  import Ganga.Utility.logging 
  12  from Ganga.GPIDev.Adapters.IMerger import MergerError 
  13  logger = Ganga.Utility.logging.getLogger() 
  14   
  15  from Ganga.Utility.util import isStringLike 
  16  from Ganga.Utility.logging import log_user_exception 
  17   
  18  from Ganga.Core import GangaException 
  19  from Ganga.Core.GangaRepository import RegistryKeyError 
  20   
  21  from Ganga.GPIDev.Adapters.IApplication import PostprocessStatusUpdate 
  22   
  23   
24 -class JobStatusError(GangaException):
25 - def __init__(self,*args):
27
28 -class JobError(GangaException):
29 - def __init__(self,what):
30 GangaException.__init__(self,what) 31 self.what=what
32 - def __str__(self):
33 return "JobError: %s"%str(self.what)
34 35 import Ganga.Utility.guid 36
37 -class JobInfo(GangaObject):
38 ''' Additional job information. 39 Partially implemented 40 ''' 41 _schema = Schema(Version(0,1),{ 42 'submit_counter' : SimpleItem(defvalue=0,protected=1,doc="job submission/resubmission counter"), 43 'monitor' : ComponentItem('monitor',defvalue=None,load_default=0,comparable=0, optional=1,doc="job monitor instance"), 44 'uuid' : SimpleItem(defvalue='',protected=1,comparable=0, doc='globally unique job identifier'), 45 'monitoring_links' : SimpleItem(defvalue=[],typelist=['tuple'],sequence=1,protected=1,copyable=0,doc="list of tuples of monitoring links") 46 }) 47 48 _category = 'jobinfos' 49 _name = 'JobInfo' 50
51 - def __init__(self):
52 super(JobInfo, self).__init__()
53 #self.submit_counter = 0 54
55 - def increment(self):
56 self.submit_counter += 1
57 58 from JobTime import JobTime 59
60 -class Job(GangaObject):
61 '''Job is an interface for submision, killing and querying the jobs :-). 62 63 Basic configuration: 64 65 The "application" attribute defines what should be 66 run. Applications may be generic arbitrary executable scripts or 67 complex, predefined objects. 68 69 The "backend" attribute defines where and how to run. Backend 70 object represents a resource or a batch system with various 71 configuration parameters. 72 73 Available applications, backends and other job components may be listed 74 using the plugins() function. See help on plugins() function. 75 76 The "status" attribute represents the state of Ganga job object. 77 It is automatically updated by the monitoring loop. Note that 78 typically at the backends the jobs have their own, more detailed 79 status. This information is typically available via 80 "job.backend.status" attribute. 81 82 Bookkeeping and persistency: 83 84 Job objects contain basic book-keeping information: "id", "status" 85 and "name". Job objects are automatically saved in a job 86 repository which may be a special directory on a local filesystem 87 or a remote database. 88 89 Input/output and file workspace: 90 91 There is an input/output directory called file workspace 92 associated with each job ("inputdir" and "outputdir" properties). 93 When a job is submitted, all input files are copied to the file 94 workspace to keep consistency of the input while the job is 95 running. Ganga then ships all files in the input workspace to the 96 backend systems in a sandbox. 97 98 The list of input files is defined by the application 99 (implicitly). Additional files may be explicitly specified in the 100 "inputsandbox" attribute. 101 102 Job splitting: 103 104 The "splitter" attributes defines how a large job may be divided into 105 smaller subjobs. The subjobs are automatically created when the main 106 (master) job is submitted. The "subjobs" attribute gives access to 107 individual subjobs. The "master" attribute of a subjob points back to the 108 master job. 109 110 Merging: 111 112 The "merger" attribute defines how the output of the subjobs may be merged. 113 Merging is not perfromed automatically and it is triggered by the merge() method. 114 115 Datasets: PENDING 116 Datasets are highly application and virtual organisation specific. 117 ''' 118 _schema = Schema(Version(1,6),{ 'inputsandbox' : FileItem(defvalue=[],typelist=['str','Ganga.GPIDev.Lib.File.File.File'],sequence=1,doc="list of File objects shipped to the worker node "), 119 'outputsandbox' : SimpleItem(defvalue=[],typelist=['str'],sequence=1,doc="list of filenames or patterns shipped from the worker node"), 120 'info':ComponentItem('jobinfos',defvalue=None,doc='JobInfo '), 121 'time':ComponentItem('jobtime', defvalue=None,protected=1,comparable=0,doc='provides timestamps for status transitions'), 122 'application' : ComponentItem('applications',doc='specification of the application to be executed'), 123 'backend': ComponentItem('backends',doc='specification of the resources to be used (e.g. batch system)'), 124 'id' : SimpleItem('',protected=1,comparable=0,doc='unique Ganga job identifier generated automatically'), 125 'status': SimpleItem('new',protected=1,checkset='_checkset_status',doc='current state of the job, one of "new", "submitted", "running", "completed", "killed", "unknown", "incomplete"'), 126 'name':SimpleItem('',doc='optional label which may be any combination of ASCII characters',typelist=['str']), 127 'inputdir':SimpleItem(getter="getStringInputDir",defvalue=None,transient=1,protected=1,comparable=0,load_default=0,optional=1,copyable=0,typelist=['str'],doc='location of input directory (file workspace)'), 128 129 'outputdir':SimpleItem(getter="getStringOutputDir",defvalue=None,transient=1,protected=1,comparable=0,load_default=0,optional=1,copyable=0,typelist=['str'],doc='location of output directory (file workspace)'), 130 131 'inputdata':ComponentItem('datasets',defvalue=None,typelist=['Ganga.GPIDev.Lib.Dataset.Dataset'],load_default=0,optional=1,doc='dataset definition (typically this is specific either to an application, a site or the virtual organization'), 132 'outputdata':ComponentItem('datasets',defvalue=None,load_default=0,optional=1,doc='dataset definition (typically this is specific either to an application, a site or the virtual organization'), 133 'splitter':ComponentItem('splitters',defvalue=None,load_default=0,optional=1,doc='optional splitter'), 134 'subjobs':ComponentItem('jobs',defvalue=[],sequence=1,protected=1,load_default=0,copyable=0,optional=1,proxy_get="_subjobs_proxy",doc='list of subjobs (if splitting)',summary_print = '_subjobs_summary_print'), 135 'master':ComponentItem('jobs',getter="_getParent",transient=1,protected=1,load_default=0,defvalue=None,optional=1,copyable=0,comparable=0,doc='master job',visitable=0), 136 'merger':ComponentItem('mergers',defvalue=None,load_default=0,optional=1,doc='optional output merger'), 137 'do_auto_resubmit':SimpleItem(defvalue = False, doc='Automatically resubmit failed subjobs'), 138 'fqid':SimpleItem(getter="getStringFQID",transient=1,protected=1,load_default=0,defvalue=None,optional=1,copyable=0,comparable=0,typelist=['str'],doc='fully qualified job identifier',visitable=0) 139 }) 140 141 _category = 'jobs' 142 _name = 'Job' 143 _exportmethods = ['submit','remove','kill', 'resubmit', 'peek','fail', 'force_status','merge' ] 144 145 default_registry = 'jobs' 146 147 # preferences for the GUI... 148 _GUIPrefs = [ { 'attribute' : 'id' }, 149 { 'attribute' : 'status' }, 150 { 'attribute' : 'inputsandbox', 'displayLevel' : 1 }, 151 { 'attribute' : 'inputdata' }, 152 { 'attribute' : 'outputsandbox' } ] 153 154 155 # TODO: usage of **kwds may be envisaged at this level to optimize the overriding of values, this must be reviewed
156 - def __init__(self):
157 super(Job, self).__init__() 158 self.time.newjob() #<-----------NEW: timestamp method
159
160 - def _readonly(self):
161 return self.status != 'new'
162 163 # status may only be set directly using updateStatus() method 164 # only modules comming from Ganga.GPIDev package may change it directly
165 - def _checkset_status(self,value):
166 try: 167 id = self.id 168 except KeyError: 169 id = None 170 try: 171 oldstat = self.status 172 except KeyError: 173 oldstat = None 174 logger.debug('job %s "%s" setting raw status to "%s"',str(id),str(oldstat),value)
175 #import inspect,os 176 #frame = inspect.stack()[2] 177 ##if not frame[0].f_code.co_name == 'updateStatus' and 178 #if frame[0].f_code.co_filename.find('/Ganga/GPIDev/')==-1 and frame[0].f_code.co_filename.find('/Ganga/Core/')==-1: 179 # raise AttributeError('cannot modify job.status directly, use job.updateStatus() method instead...') 180 #del frame 181
182 - class State:
183 - def __init__(self,state,transition_comment='',hook=None):
184 self.state = state 185 self.transition_comment = transition_comment 186 self.hook = hook
187
188 - class Transitions(dict):
189 - def __init__(self,*states):
190 self.states = {} 191 for s in states: 192 assert(not s.state in self) 193 self[s.state] = s
194 195 status_graph = {'new' : Transitions(State('submitting','j.submit()',hook='monitorSubmitting_hook'), 196 State('removed','j.remove()')), 197 'submitting' : Transitions(State('new','submission failed',hook='rollbackToNewState'), 198 State('submitted',hook='monitorSubmitted_hook'), 199 State('unknown','forced remove OR remote jobmgr error'), 200 State('failed','manually forced or keep_on_failed=True',hook='monitorFailed_hook')), 201 'submitted' : Transitions(State('running'), 202 State('killed','j.kill()',hook='monitorKilled_hook'), 203 State('unknown','forced remove'), 204 State('failed', 'j.fail(force=1)',hook='monitorFailed_hook'), 205 State('completing','job output already in outputdir',hook='postprocess_hook'), 206 State('completed',hook='postprocess_hook'), 207 State('submitting','j.resubmit(force=1)')), 208 'running' : Transitions(State('completing'), 209 State('completed','job output already in outputdir',hook='postprocess_hook'), 210 State('failed','backend reported failure OR j.fail(force=1)',hook='monitorFailed_hook'), 211 State('killed','j.kill()',hook='monitorKilled_hook'), 212 State('unknown','forced remove'), 213 State('submitting','j.resubmit(force=1)'), 214 State('submitted','j.resubmit(force=1)')), 215 'completing' : Transitions(State('completed',hook='postprocess_hook'), 216 State('failed','postprocessing error OR j.fail(force=1)',hook='postprocess_hook_failed'), 217 State('unknown','forced remove'), 218 State('submitting','j.resubmit(force=1)'), 219 State('submitted','j.resubmit(force=1)')), 220 'killed' : Transitions(State('removed','j.remove()'), 221 State('failed','j.fail()'), 222 State('submitting','j.resubmit()'), 223 State('submitted','j.resubmit()')), 224 'failed' : Transitions(State('removed','j.remove()'), 225 State('submitting','j.resubmit()'), 226 State('submitted','j.resubmit()')), 227 'completed' : Transitions(State('removed','j.remove()'), 228 State('failed','j.fail()'), 229 State('submitting','j.resubmit()'), 230 State('submitted','j.resubmit()')), 231 'incomplete' : Transitions(State('removed','j.remove()')), 232 'unknown' : Transitions(State('removed','forced remove')), 233 'template': Transitions(State('removed')) 234 } 235 236 transient_states = ['incomplete','removed','unknown'] 237 initial_states = ['new','incomplete','template'] 238
239 - def updateStatus(self,newstatus, transition_update = True):
240 """ Move job to the new status. according to the state transition graph (Job.status_graph). 241 If transition is allowed: 242 - call the specific transition hook (if exists) 243 - call the general transition update hook (triggers the auto merger and application hooks) 244 - commit the job 245 If job cannot be commited, then revert to old status and raise JobStatusError. 246 If transition is not allowed then raise JobStatusError. 247 248 Transitions from to the current state are allowed by default (so you can updateStatus('running') if job is 'running'). 249 Such default transitions do not have hooks. 250 """ 251 fqid = self.getFQID('.') 252 logger.debug('attempt to change job %s status from "%s" to "%s"',fqid, self.status,newstatus) 253 254 try: 255 state = self.status_graph[self.status][newstatus] 256 except KeyError: 257 # allow default transitions: s->s, no hook 258 if newstatus == self.status: 259 state = Job.State(newstatus) 260 else: 261 raise JobStatusError('forbidden status transition of job %s from "%s" to "%s"'%(fqid, self.status,newstatus)) 262 263 self._getWriteAccess() 264 265 saved_status = self.status 266 267 try: 268 if state.hook: 269 try: 270 getattr(self,state.hook)() 271 except PostprocessStatusUpdate,x: 272 newstatus = x.status 273 274 if transition_update: 275 #we call this even if there was a hook 276 newstatus = self.transition_update(newstatus) 277 278 if self.status != newstatus: 279 self.time.timenow(str(newstatus)) 280 logger.debug("timenow('%s') called.", self.status) 281 else: 282 logger.debug("Status changed from '%s' to '%s'. No new timestamp was written", self.status, newstatus) 283 284 self.status = newstatus # move to the new state AFTER hooks are called 285 self._commit() 286 except Exception,x: 287 self.status = saved_status 288 log_user_exception() 289 raise JobStatusError(x) 290 291 logger.info('job %s status changed to "%s"',fqid,self.status)
292
293 - def transition_update(self,new_status):
294 """Propagate status transitions""" 295 try: 296 runAutoMerge(self, new_status) 297 except MergerError: 298 #stop recursion 299 new_status = 'failed' 300 self.updateStatus(new_status, transition_update = False) 301 302 #Propagate transition updates to applications 303 if self.application: 304 self.application.transition_update(new_status) 305 return new_status
306
307 - def updateMasterJobStatus(self):
308 """ 309 Update master job status based on the status of subjobs. 310 This is an auxiliary method for implementing bulk subjob monitoring. 311 """ 312 313 j = self 314 stats = [s.status for s in j.subjobs] 315 316 # ignore non-split jobs 317 if not stats: 318 logger.warning('ignoring master job status updated for job %s (NOT MASTER)',self.getFQID('.')) 319 return 320 321 new_stat = None 322 323 for s in ['submitting','submitted','running','failed','killed','completing','completed']: 324 if s in stats: 325 new_stat = s 326 break 327 328 if new_stat == j.status: 329 return 330 331 if not new_stat: 332 logger.critical('undefined state for job %d, stats=%s',j.id,str(stats)) 333 334 j.updateStatus(new_stat)
335
336 - def getMonitoringService(self):
339
340 - def monitorPrepare_hook(self, subjobconfig):
341 """Let monitoring services work with the subjobconfig after it's prepared""" 342 self.getMonitoringService().prepare(subjobconfig=subjobconfig)
343
344 - def monitorSubmitting_hook(self):
345 """Let monitoring services perform actions at job submission""" 346 self.getMonitoringService().submitting()
347
348 - def monitorSubmitted_hook(self):
349 """Send monitoring information (e.g. Dashboard) at the time of job submission""" 350 self.getMonitoringService().submit()
351
352 - def postprocess_hook(self):
355
356 - def postprocess_hook_failed(self):
359
360 - def monitorFailed_hook(self):
361 self.getMonitoringService().fail()
362
363 - def monitorKilled_hook(self):
364 self.getMonitoringService().kill()
365
366 - def monitorRollbackToNew_hook(self):
368
369 - def _auto__init__(self,registry=None):
370 if registry is None: 371 from Ganga.Core.GangaRepository import getRegistry 372 registry = getRegistry(self.default_registry) 373 374 self.info.uuid = Ganga.Utility.guid.uuid() 375 376 # register the job (it will also commit it) 377 # job gets its id now 378 registry._add(self) 379 self._init_workspace() 380 self._setDirty()
381 382
383 - def _init_workspace(self):
384 self.getDebugWorkspace(create=True)
385 386
387 - def getWorkspace(self,what,create=True):
388 import Ganga.Core.FileWorkspace 389 Workspace = getattr(Ganga.Core.FileWorkspace,what) 390 w = Workspace() 391 import os 392 w.jobid = self.getFQID(os.sep) 393 if create: 394 w.create(w.jobid) 395 return w
396
397 - def createPackedInputSandbox(self, files, master=False):
398 """ Create a packed input sandbox which contains files (a list of File or FileBuffer objects). 399 'master' flag is used to make a difference between the master and shared input sandbox which is important 400 if the job is not split (subjob and masterjob are the same object) 401 """ 402 403 import Ganga.Core.Sandbox as Sandbox 404 name = '_input_sandbox_'+self.getFQID('_')+'%s.tgz' 405 if master: 406 name = name % "_master" 407 else: 408 name = name % "" 409 410 return Sandbox.createPackedInputSandbox(files,self.getInputWorkspace(),name)
411
412 - def createInputSandbox(self, files, master=False):
413 """ Create an unpacked input sandbox which contains files (a list of File or FileBuffer objects). 414 'master' flag is not used in this case, and it provided only for uniformity with createPackedInputSandbox() method 415 """ 416 417 import Ganga.Core.Sandbox as Sandbox 418 return Sandbox.createInputSandbox(files,self.getInputWorkspace())
419
420 - def getStringFQID(self):
421 return self.getFQID('.')
422
423 - def getStringInputDir(self):
424 return self.getInputWorkspace(create=self.status != 'removed').getPath()
425
426 - def getStringOutputDir(self):
427 return self.getOutputWorkspace(create=self.status != 'removed').getPath()
428
429 - def getFQID(self,sep=None):
430 """Return a fully qualified job id (within registry): a list of ids [masterjob_id,subjob_id,...] 431 If optional sep is specified FQID string is returned, ids are separated by sep. 432 For example: getFQID('.') will return 'masterjob_id.subjob_id....' 433 """ 434 435 fqid = [self.id] 436 cur = self._getParent() # FIXME: or use master attribute? 437 while cur: 438 fqid.append(cur.id) 439 cur = cur._getParent() 440 fqid.reverse() 441 442 if sep: 443 return sep.join([str(id) for id in fqid]) 444 return fqid
445
446 - def getInputWorkspace(self,create=True):
447 return self.getWorkspace('InputWorkspace',create=create)
448
449 - def getOutputWorkspace(self,create=True):
450 return self.getWorkspace('OutputWorkspace',create=create)
451
452 - def getDebugWorkspace(self,create=True):
453 return self.getWorkspace('DebugWorkspace',create=create)
454
455 - def __getstate__(self):
456 dict = super(Job, self).__getstate__() 457 #FIXME: dict['_data']['id'] = 0 # -> replaced by 'copyable' mechanism in base class 458 dict['_registry'] = None 459 return dict
460
461 - def peek( self, filename = "", command = "" ):
462 ''' 463 Allow viewing of job output (and input) files 464 465 Arguments other than self: 466 filename : name of file to be viewed 467 => For backends where this is enabled, the filename 468 for a job in the "running" state is relative to 469 the job's work directory unless the filename begins 470 with "../". In all other cases, the filename is 471 relative to the job's output directory 472 command : command to be used for viewing the file 473 => If no command is given, then the command defined in 474 the [File_Associations] section of the Ganga 475 configuration file(s) is used 476 477 Example usage: 478 479 # examine contents of output/work directory 480 peek() 481 482 # examine contents of output directory, 483 # even in case of job in "running" state 484 peek( "../output" ) 485 486 # examine contents of input directory 487 peek( "../input" ) 488 489 # View ROOT histograms, running root.exe in a new terminal window 490 peek( "histograms.root", "root.exe &&" ) 491 492 # View contents of file in output/work directory, using 493 # command defined in configuration file 494 peek( "output.txt" ) 495 496 # View ROOT histograms in ouput/work directory, 497 # running root.exe in a new terminal window 498 peek( "histograms.root", "root.exe &&" ) 499 500 Return value: None 501 ''' 502 503 import os 504 pathStart = filename.split( os.sep )[ 0 ] 505 if ( ( "running" == self.status ) and ( pathStart != ".." ) ): 506 self.backend.peek( filename = filename, command = command ) 507 else: 508 topdir = os.path.dirname( self.inputdir.rstrip( os.sep ) ) 509 path = os.path.join( topdir, "output", filename ).rstrip( os.sep ) 510 self.viewFile( path = path, command = command ) 511 return None
512
513 - def viewFile( self, path = "", command = "" ):
514 ''' 515 Allow file viewing 516 517 Arguments other than self: 518 path : path to file to be viewed 519 command : command to be used for viewing the file 520 => If no command is given, then the command defined in 521 the [File_Associations] section of the Ganga 522 configuration file(s) is used 523 524 This is intended as a helper function for the peek() method. 525 526 Return value: None 527 ''' 528 529 import os 530 from Ganga.Utility.Config import ConfigError, getConfig 531 from exceptions import IndexError 532 config = getConfig( "File_Associations" ) 533 if os.path.exists( path ): 534 if os.path.islink( path ): 535 path = os.readlink( path ) 536 if not command: 537 if os.path.isdir( path ): 538 command = config[ "listing_command" ] 539 else: 540 suffix = os.path.splitext( path )[ 1 ].lstrip( "." ) 541 try: 542 command = config[ suffix ] 543 except ConfigError: 544 command = config[ "fallback_command" ] 545 546 mode = os.P_WAIT 547 548 try: 549 tmpList = command.split( "&&" ) 550 termCommand = tmpList[ 1 ] 551 if not termCommand: 552 termCommand = config[ "newterm_command" ] 553 exeopt = config[ "newterm_exeopt" ] 554 exeCommand = " ".join( [ tmpList[ 0 ], path ] ) 555 argList = [ termCommand, exeopt, exeCommand ] 556 mode = os.P_NOWAIT 557 except IndexError: 558 tmpList = command.split( "&" ) 559 if ( len( tmpList ) > 1 ): 560 mode = os.P_NOWAIT 561 command = tmpList[ 0 ] 562 argList = command.split() 563 argList.append( path ) 564 565 cmd = argList[ 0 ] 566 os.spawnvp( mode, cmd, argList ) 567 else: 568 logger.warning( "File/directory '%s' not found" % path ) 569 570 return None
571
572 - def submit(self,keep_going=None,keep_on_fail=None):
573 '''Submits a job. Return true on success. 574 575 First the application is configured which may generate 576 additional input files, preprocess executable scripts etc. 577 Then backend handler is used to submit the configured job. 578 The job is automatically checkpointed to persistent storage. 579 580 The default values of keep_going and keep_on_fail are controlled by [GPI_Semantics] configuration options. 581 582 When the submission fails the job status is automatically 583 reverted to new and all files in the input directory are 584 deleted (keep_on_fail=False is the default behaviour unless modified in configuration). 585 If keep_on_fail=True then the job status 586 is moved to the failed status and input directory is left intact. 587 This is helpful for debugging anf implements the request #43143. 588 589 For split jobs: consult https://twiki.cern.ch/twiki/bin/view/ArdaGrid/GangaSplitters#Subjob_submission 590 ''' 591 from Ganga.Utility.Config import ConfigError, getConfig 592 gpiconfig = getConfig('GPI_Semantics') 593 594 if keep_going is None: 595 keep_going = gpiconfig['job_submit_keep_going'] 596 597 if keep_on_fail is None: 598 keep_on_fail = gpiconfig['job_submit_keep_on_fail'] 599 600 from Ganga.Core import ApplicationConfigurationError, JobManagerError, IncompleteJobSubmissionError, GangaException 601 602 # make sure nobody writes to the cache during this operation 603 #job._registry.cache_writers_mutex.lock() 604 605 import inspect 606 supports_keep_going = 'keep_going' in inspect.getargspec(self.backend.master_submit)[0] 607 608 if keep_going and not supports_keep_going: 609 msg = 'job.submit(keep_going=True) is not supported by %s backend'%self.backend._name 610 logger.error(msg) 611 raise JobError(msg) 612 613 614 # can only submit jobs in a 'new' state 615 if self.status != 'new': 616 msg = "cannot submit job %s which is in '%s' state"%(self.getFQID('.'),self.status) 617 logger.error(msg) 618 raise JobError(msg) 619 620 assert(self.subjobs == []) 621 622 if self.master is not None: 623 msg = "Cannot submit subjobs directly." 624 logger.error(msg) 625 raise JobError(msg) 626 627 # select the runtime handler 628 from Ganga.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers 629 try: 630 rtHandler = allHandlers.get(self.application._name,self.backend._name)() 631 except KeyError,x: 632 msg = 'runtime handler not found for application=%s and backend=%s'%(self.application._name,self.backend._name) 633 logger.error(msg) 634 raise JobError(msg) 635 636 try: 637 logger.info("submitting job %d",self.id) 638 # prevent other sessions from submitting this job concurrently. Also calls _getWriteAccess 639 self.updateStatus('submitting') 640 641 try: 642 #NOTE: this commit is redundant if updateStatus() is used on the line above 643 self._commit() 644 except Exception,x: 645 msg = 'cannot commit the job %s, submission aborted'%self.id 646 logger.error(msg) 647 self.status = 'new' 648 raise JobError(msg) 649 650 self.getDebugWorkspace().remove(preserve_top=True) 651 652 appmasterconfig = self.application.master_configure()[1] # FIXME: obsoleted "modified" flag 653 # split into subjobs 654 # try: 655 if 1: 656 if self.splitter: 657 subjobs = self.splitter.validatedSplit(self) 658 #print "*"*80 659 #import sys 660 #subjobs[0].printTree(sys.stdout) 661 662 # EBKE changes 663 i = 0 664 # bug fix for #53939 -> first set id of the subjob and then append to self.subjobs 665 #self.subjobs = subjobs 666 #for j in self.subjobs: 667 for j in subjobs: 668 j.info.uuid = Ganga.Utility.guid.uuid() 669 j.status='new' 670 j.time.timenow('new') 671 j.id = i 672 i += 1 673 self.subjobs.append(j) 674 675 for j in self.subjobs: 676 j._init_workspace() 677 678 rjobs = self.subjobs 679 self._commit() 680 else: 681 rjobs = [self] 682 683 # configure the application of each subjob 684 appsubconfig = [ j.application.configure(appmasterconfig)[1] for j in rjobs ] #FIXME: obsoleted "modified" flag 685 appconfig = (appmasterconfig,appsubconfig) 686 687 # prepare the master job with the runtime handler 688 jobmasterconfig = rtHandler.master_prepare(self.application,appmasterconfig) 689 690 # prepare the subjobs with the runtime handler 691 jobsubconfig = [ rtHandler.prepare(j.application,s,appmasterconfig,jobmasterconfig) for (j,s) in zip(rjobs,appsubconfig) ] 692 693 # notify monitoring-services 694 self.monitorPrepare_hook(jobsubconfig) 695 696 # submit the job 697 try: 698 if supports_keep_going: 699 r = self.backend.master_submit(rjobs,jobsubconfig,jobmasterconfig,keep_going) 700 else: 701 r = self.backend.master_submit(rjobs,jobsubconfig,jobmasterconfig) 702 703 if not r: 704 raise JobManagerError('error during submit') 705 706 #FIXME: possibly should go to the default implementation of IBackend.master_submit 707 if self.subjobs: 708 for jobs in self.subjobs: 709 jobs.info.increment() 710 711 712 except IncompleteJobSubmissionError,x: 713 logger.warning('Not all subjobs have been sucessfully submitted: %s',x) 714 self.info.increment() 715 self.updateStatus('submitted') # FIXME: if job is not split, then default implementation of backend.master_submit already have set status to "submitted" 716 self._commit() # make sure that the status change goes to the repository, NOTE: this commit is redundant if updateStatus() is used on the line above 717 718 #send job submission message 719 from Ganga.Runtime.spyware import ganga_job_submitted 720 721 if len(self.subjobs) == 0: 722 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "1", "0", "0") 723 else: 724 submitted_count = 0 725 for sj in self.subjobs: 726 if sj.status == 'submitted': 727 submitted_count+=1 728 729 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "0", "1", str(submitted_count)) 730 731 return 1 732 except Exception,x: 733 if isinstance(x,GangaException): 734 log_user_exception(logger,debug = True) 735 logger.error(str(x)) 736 else: 737 log_user_exception(logger,debug = False) 738 739 if keep_on_fail: 740 self.updateStatus('failed') 741 else: 742 # revert to the new status 743 logger.error('%s ... reverting job %s to the new status', str(x), self.getFQID('.') ) 744 self.updateStatus('new') 745 raise JobError(x)
746 747
748 - def rollbackToNewState(self):
749 ''' 750 Rollback the job to the "new" state if submitting of job failed: 751 - cleanup the input and output workspace preserving the top dir(bug ##19434) 752 - do not remove debug directory 753 - cleanup subjobs 754 This method is used as a hook for submitting->new transition 755 @see updateJobStatus() 756 ''' 757 758 # notify monitoring-services 759 self.monitorRollbackToNew_hook() 760 761 self.getInputWorkspace().remove(preserve_top=True) 762 self.getOutputWorkspace().remove(preserve_top=True) 763 #notify subjobs 764 for sj in self.subjobs: 765 sj.application.transition_update("removed") 766 #delete subjobs 767 self.subjobs = [] 768 self._commit()
769 770
771 - def remove(self,force=False):
772 '''Remove the job. 773 774 If job has been submitted try to kill it first. Then remove 775 the file workspace associated with the job. 776 777 If force=True then remove job without killing it. 778 ''' 779 780 template = self.status=='template' 781 782 if template: 783 logger.info('removing template %d',self.id) 784 else: 785 logger.info('removing job %d',self.id) 786 787 if self.status == 'removed': 788 msg = 'job %d already removed'%self.id 789 logger.error(msg) 790 raise JobError(msg) 791 792 if self.status == 'completing': 793 msg = 'job %s is completing (may be downloading output), do force_status("failed") and then remove() again'%self.getFQID('.') 794 logger.error(msg) 795 raise JobError(msg) 796 797 if self.master: 798 msg = 'cannot remove subjob %s'%self.getFQID('.') 799 logger.info(msg) 800 raise JobError(msg) 801 802 try: 803 self._getWriteAccess() 804 except RegistryKeyError: 805 if self._registry: 806 self._registry._remove(self,auto_removed=1) 807 return 808 809 810 if self.status in ['submitted','running']: 811 try: 812 if not force: 813 self._kill(transition_update=False) 814 except GangaException,x: 815 log_user_exception(logger,debug = True) 816 except Exception,x: 817 log_user_exception(logger) 818 logger.warning('unhandled exception in j.kill(), job id=%d',self.id) 819 820 # incomplete or unknown jobs may not have valid application or backend objects 821 if not self.status in ['incomplete','unknown']: 822 # tell the backend that the job was removed 823 # this is used by Remote backend to remove the jobs remotely 824 if hasattr(self.backend,'remove'): #bug #44256: Job in state "incomplete" is impossible to remove 825 self.backend.remove() 826 827 # tell the application that the job was removed 828 try: 829 self.application.transition_update("removed") 830 for sj in self.subjobs: 831 sj.application.transition_update("removed") 832 except AttributeError: 833 # Some applications do not have transition_update 834 pass 835 836 if self._registry: 837 self._registry._remove(self,auto_removed=1) 838 839 self.status = 'removed' 840 841 if not template: 842 # remove the corresponding workspace files 843 844 # FIXME: this is a hack to remove the entire job directory, should be properly solved with the self.workspace() 845 from Ganga.Core.FileWorkspace import InputWorkspace 846 wsp = InputWorkspace() 847 wsp.subpath='' 848 wsp.jobid = self.id 849 850 def doit(f): 851 try: 852 f() 853 except OSError,x: 854 logger.warning('cannot remove file workspace associated with the job %d : %s',self.id,str(x))
855 856 doit(wsp.remove) 857 858 self.getDebugWorkspace(create=False).remove(preserve_top=False)
859 860
861 - def fail(self,force=False):
862 """Deprecated. Use force_status('failed') instead.""" 863 raise JobError('fail() method is deprecated, use force_status("failed") instead.')
864 865 allowed_force_states = { 'completed' : ['completing','failed'], 866 'failed' : ["submitting","completing","completed","submitted","running","killed"] } 867
868 - def force_status(self,status,force=False):
869 ''' Force job to enter the "failed" or "completed" state. This may be 870 used for marking jobs "bad" jobs or jobs which are stuck in one of the 871 internal ganga states (e.g. completing). 872 873 To see the list of allowed states do: job.force_status(None) 874 ''' 875 876 if status is None: 877 logger.info("The following states may be forced") 878 revstates = {} 879 for s1 in Job.allowed_force_states: 880 for s2 in Job.allowed_force_states[s1]: 881 revstates.setdefault(s2,{}) 882 revstates[s2][s1] = 1 883 884 for s in revstates: 885 logger.info("%s => %s"%(s,revstates[s].keys())) 886 return 887 888 if self.status == status: 889 return 890 891 if not status in Job.allowed_force_states: 892 raise JobError('force_status("%s") not allowed. Job may be forced to %s states only.'%(status,Job.allowed_force_states.keys())) 893 894 if not self.status in Job.allowed_force_states[status]: 895 raise JobError('Only a job in one of %s may be forced into "failed" (job %s)'%(str(Job.allowed_force_states[status]),self.getFQID('.'))) 896 897 if not force: 898 if self.status in ['submitted','running']: 899 try: 900 self._kill(transition_update=False) 901 except JobError,x: 902 x.what += "Use force_status('%s',force=True) to ignore kill errors."%status 903 raise x 904 try: 905 self.updateStatus(status) 906 logger.info('Job %s forced to status "%s"',self.getFQID('.'),status) 907 except JobStatusError,x: 908 logger.error(x) 909 raise x
910
911 - def kill(self):
912 '''Kill the job. Raise JobError exception on error. 913 ''' 914 self._kill(transition_update=True)
915
916 - def _kill(self,transition_update):
917 '''Private helper. Kill the job. Raise JobError exception on error. 918 ''' 919 try: 920 from Ganga.Core import GangaException 921 922 self._getWriteAccess() 923 # make sure nobody writes to the cache during this operation 924 #job._registry.cache_writers_mutex.lock() 925 926 fqid = self.getFQID('.') 927 logger.info('killing job %s',fqid) 928 if not self.status in ['submitted','running']: 929 msg = "cannot kill job which is in '%s' state. "%self.status 930 logger.error(msg) 931 raise JobError(msg) 932 try: 933 if self.backend.master_kill(): 934 self.updateStatus('killed',transition_update=transition_update) 935 936 ############ 937 # added as part of typestamp prototype by Justin 938 j = self.getJobObject() 939 if j.subjobs: 940 for jobs in j.subjobs: 941 if jobs.status not in ['failed', 'killed', 'completed']: ## added this 10/8/2009 - now only kills subjobs which aren't finished. 942 jobs.updateStatus('killed',transition_update=transition_update) 943 # 944 ############ 945 946 self._commit() 947 948 return True 949 else: 950 msg = "backend.master_kill() returned False" 951 raise JobError(msg) 952 except GangaException,x: 953 msg = "failed to kill job %s: %s"%(fqid,str(x)) 954 logger.error(msg) 955 raise JobError(msg) 956 finally: 957 pass #job._registry.cache_writers_mutex.release()
958
959 - def resubmit(self,backend=None):
960 """Resubmit a failed or completed job. A backend object may 961 be specified to change some submission parameters (which 962 parameters may be effectively changed depends on a 963 particular backend implementation). 964 965 Example: 966 b = j.backend.copy() 967 b.CE = 'some CE' 968 j.resubmit(backend=b) 969 970 Note: it is not possible to change the type of the backend in this way. 971 972 """ 973 return self._resubmit(backend=backend)
974
975 - def auto_resubmit(self):
976 """ Private method used for auto resubmit functionality by the monitoring loop. 977 """ 978 return self._resubmit(auto_resubmit=True)
979 980
981 - def _resubmit(self,backend=None,auto_resubmit=False):
982 """ Internal implementation of resubmit which handles the publically accessible resubmit() method proper as well 983 as the auto_resubmit use case used in the monitoring loop. 984 """ 985 # there are possible two race condition which must be handled somehow: 986 # - a simple job is monitorable and at the same time it is 'resubmitted' - potentially the monitoring may update the status back! 987 # - same for the master job... 988 989 from Ganga.Core import GangaException, IncompleteJobSubmissionError, JobManagerError 990 991 fqid = self.getFQID('.') 992 logger.info('resubmitting job %s',fqid) 993 994 if backend and auto_resubmit: 995 msg = "job %s: cannot change backend when auto_resubmit=True. This is most likely an internal implementation problem."%fqid 996 logger.error(msg) 997 raise JobError(msg) 998 999 #the status check is disabled when auto_resubmit 1000 if not self.status in ['completed','failed','killed'] and not auto_resubmit: 1001 msg = "cannot resubmit job %s which is in '%s' state"%(fqid,self.status) 1002 logger.error(msg) 1003 raise JobError(msg) 1004 1005 backendProxy = backend 1006 backend = None 1007 1008 if backendProxy: 1009 backend = backendProxy._impl 1010 1011 # do not allow to change the backend type 1012 if backend and self.backend._name != backend._name: 1013 msg = "cannot resubmit job %s: change of the backend type is not allowed"%fqid 1014 logger.error(msg) 1015 raise JobError(msg) 1016 1017 # if the backend argument is identical (no attributes changed) then it is equivalent to None 1018 # the good side effect is that in this case we don't require any backend resubmit method to support 1019 # the extra backend argument 1020 if backend == self.backend: 1021 backend = None 1022 1023 # check if the backend supports extra 'backend' argument for master_resubmit() 1024 import inspect 1025 supports_master_resubmit = len(inspect.getargspec(self.backend.master_resubmit)[0])>1 1026 1027 if not supports_master_resubmit and backend: 1028 raise JobError('%s backend does not support changing of backend parameters at resubmission (optional backend argument is not supported)'%self.backend._name) 1029 1030 def check_changability(obj1,obj2): 1031 # check if the only allowed attributes have been modified 1032 for name,item in obj1._schema.allItems(): 1033 v1 = getattr(obj1,name) 1034 v2 = getattr(obj2,name) 1035 if not item['changable_at_resubmit'] and item['copyable']: 1036 if v1 != v2: 1037 raise JobError('%s.%s attribute cannot be changed at resubmit'%(obj1._name,name)) 1038 if item.isA('ComponentItem'): 1039 check_changability(v1,v2)
1040 1041 if backend: 1042 check_changability(self.backend,backend) 1043 1044 oldstatus = self.status 1045 1046 self.updateStatus('submitting') 1047 1048 try: 1049 self._commit() 1050 except Exception,x: 1051 msg = 'cannot commit the job %s, resubmission aborted'%fqid 1052 logger.error(msg) 1053 self.status = oldstatus 1054 raise JobError(msg) 1055 1056 self.getDebugWorkspace().remove(preserve_top=True) 1057 1058 try: 1059 rjobs = self.subjobs 1060 if not rjobs: 1061 rjobs = [self] 1062 elif auto_resubmit: # get only the failed jobs for auto resubmit 1063 rjobs = [s for s in rjobs if s.status in ['failed'] ] 1064 1065 if rjobs: 1066 for sjs in rjobs: 1067 sjs.info.increment() 1068 sjs.getOutputWorkspace().remove(preserve_top=True) #bugfix: #31690: Empty the outputdir of the subjob just before resubmitting it 1069 1070 try: 1071 if auto_resubmit: 1072 result = self.backend.master_auto_resubmit(rjobs) 1073 else: 1074 if backend is None: 1075 result = self.backend.master_resubmit(rjobs) 1076 else: 1077 result = self.backend.master_resubmit(rjobs,backend=backend) 1078 1079 if not result: 1080 raise JobManagerError('error during submit') 1081 except IncompleteJobSubmissionError,x: 1082 logger.warning('Not all subjobs of job %s have been sucessfully re-submitted: %s',fqid,x) 1083 1084 #self.info.increment() #commented out to fix bug 77962 1085 1086 if self.subjobs: 1087 for sjs in self.subjobs: 1088 sjs.time.timenow('resubmitted') 1089 else: 1090 self.time.timenow('resubmitted') 1091 1092 self.status = 'submitted' # FIXME: if job is not split, then default implementation of backend.master_submit already have set status to "submitted" 1093 self._commit() # make sure that the status change goes to the repository 1094 1095 #send job submission message 1096 from Ganga.Runtime.spyware import ganga_job_submitted 1097 1098 #if resubmit on subjob 1099 if fqid.find('.') > 0: 1100 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "0", "0", "1") 1101 #if resubmit on plain job 1102 elif len(self.subjobs) == 0: 1103 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "1", "0", "0") 1104 #else resubmit on master job -> increment the counter of the subjobs with the succesfull resubmited subjobs 1105 else: 1106 submitted_count = 0 1107 for sj in self.subjobs: 1108 if sj.status == 'submitted': 1109 submitted_count+=1 1110 1111 ganga_job_submitted(self.application.__class__.__name__, self.backend.__class__.__name__, "0", "0", str(submitted_count)) 1112 1113 1114 except GangaException,x: 1115 logger.error("failed to resubmit job, %s" % (str(x),)) 1116 logger.warning('reverting job %s to the %s status', fqid, oldstatus ) 1117 self.status = oldstatus 1118 self._commit() #PENDING: what to do if this fails? 1119 raise 1120
1121 - def _commit(self,objects=None):
1122 """ Helper method to unconditionally commit to the repository. The 'objects' list specifies objects 1123 to be commited (for example the subjobs). If objects are not specified then just the self is commited """ 1124 1125 if objects is None: 1126 objects = [self] 1127 # EBKE changes 1128 objects = [self._getRoot()] 1129 reg = self._getRegistry() 1130 if not reg is None: 1131 reg._flush(objects)
1132 1133
1134 - def _attribute_filter__set__(self,n,v):
1135 # a workaround for bug #8111 1136 ## if n == 'name': 1137 ## if len(v)>0 and not v.isalnum(): 1138 ## raise ValueError('%s: the job name may contain only numbers and letters (a temporary workaround for bug #8111)'%v) 1139 1140 return v
1141
1142 - def _repr(self):
1143 if self.id is None: 1144 id = "None" 1145 else: 1146 id = self.getFQID('.') 1147 #id = self.fully_qualified_id() 1148 #if len(id)==1: id = id[0] 1149 #id = str(id) 1150 #id = id.replace(' ','') 1151 return "%s#%s"%(str(self.__class__.__name__),id)
1152 1153 ## def fully_qualified_id(j): 1154 ## index = [] 1155 ## while j: 1156 ## index.append(j.id) 1157 ## j = j.master 1158 ## index.reverse() 1159 ## return tuple(index) 1160
1161 - def merge(self, subjobs = None, sum_outputdir = None, **options):
1162 '''Merge the output of subjobs. 1163 1164 By default merge all subjobs into the master outputdir. 1165 The output location and the list of subjobs may be overriden. 1166 The options (keyword arguments) are passed onto the specific merger implementation. 1167 Refer to the specific merger documentation for more information about available options. 1168 ''' 1169 1170 self._getWriteAccess() 1171 1172 #for backward compatibility if the arguments are not passed correctly -> switch them 1173 if (subjobs is not None and isStringLike(subjobs)) or (sum_outputdir is not None and not isStringLike(sum_outputdir)): 1174 #switch the arguments 1175 temp = subjobs 1176 subjobs = sum_outputdir 1177 sum_outputdir = temp 1178 logger.warning('Deprecated use of job.merge(sum_outputdir, subjobs), swap your arguments, will break in the future, it should be job.merge(subjobs, sum_outputdir)') 1179 1180 if sum_outputdir is None: 1181 sum_outputdir = self.outputdir 1182 1183 if subjobs is None: 1184 subjobs = self.subjobs 1185 1186 try: 1187 if self.merger: 1188 self.merger.merge(subjobs, sum_outputdir, **options) 1189 else: 1190 logger.warning('Cannot merge job %d: merger is not defined'%self.id) 1191 except Exception,x: 1192 log_user_exception() 1193 raise
1194
1195 - def _subjobs_proxy(self):
1196 from Ganga.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, _wrap 1197 subjobs = JobRegistrySlice('jobs(%d).subjobs'%self.id) 1198 for j in self.subjobs: 1199 subjobs.objects[j.id] = j 1200 #print 'return slice',subjobs 1201 return _wrap(subjobs)
1202
1203 - def _subjobs_summary_print(self,value,verbosity_level):
1204 rslice = self._subjobs_proxy() 1205 return rslice._display(1)
1206 1207
1208 -class JobTemplate(Job):
1209 """A placeholder for Job configuration parameters. 1210 1211 JobTemplates are normal Job objects but they are never submitted. They have their own JobRegistry, so they do not get mixed up with 1212 normal jobs. They have always a "template" status. 1213 1214 Create a job with an existing job template t: 1215 1216 j = Job(t) 1217 1218 Save a job j as a template t: 1219 1220 t = JobTemplate(j) 1221 1222 You may save commonly used job parameters in a template and create new jobs easier and faster. 1223 """ 1224 _category = 'jobs' 1225 _name = 'JobTemplate' 1226 1227 _schema = Job._schema.inherit_copy() 1228 _schema.datadict["status"] = SimpleItem('template',protected=1,checkset='_checkset_status',doc='current state of the job, one of "new", "submitted", "running", "completed", "killed", "unknown", "incomplete"') 1229 1230 default_registry = 'templates' 1231
1232 - def __init__(self):
1233 super(JobTemplate, self).__init__() 1234 self.status = "template"
1235
1236 - def _readonly(self):
1237 return 0
1238 1239 # FIXME: for the moment you have to explicitly define all methods if you want to export them...
1240 - def remove(self,force=False):
1241 '''See Job for documentation. The force optional argument has no effect (it is provided for the compatibility with Job interface)''' 1242 return super(JobTemplate,self).remove()
1243
1244 - def submit(self):
1245 """ Templates may not be submitted, return false.""" 1246 return super(JobTemplate,self).submit()
1247
1248 - def kill(self):
1249 """ Templates may not be killed, return false.""" 1250 return super(JobTemplate,self).kill()
1251 1252 1253 # 1254 # 1255 # $Log: Job.py,v $ 1256 # Revision 1.10 2009/02/24 14:59:34 moscicki 1257 # when removing jobs which are in the "incomplete" or "unknown" status, do not trigger callbacks on application and backend -> they may be missing! 1258 # 1259 # Revision 1.12.2.5 2009/07/14 15:09:37 ebke 1260 # Missed fix 1261 # 1262 # Revision 1.12.2.4 2009/07/13 22:10:52 ebke 1263 # Update for the new GangaRepository: 1264 # * Moved dict interface from Repository to Registry 1265 # * Clearly specified Exceptions to be raised by Repository 1266 # * proper exception handling in Registry 1267 # * moved _writable to _getWriteAccess, introduce _getReadAccess 1268 # * clarified locking, logic in Registry, less in Repository 1269 # * index reading support in XML (no writing, though..) 1270 # * general index reading on registry.keys() 1271 # 1272 # Revision 1.12.2.3 2009/07/10 13:30:06 ebke 1273 # Fixed _commit not commiting the root object 1274 # 1275 # Revision 1.12.2.2 2009/07/10 11:30:34 ebke 1276 # Remove reference to _data in status in preparation for lazy loading 1277 # 1278 # Revision 1.12.2.1 2009/07/08 11:18:21 ebke 1279 # Initial commit of all - mostly small - modifications due to the new GangaRepository. 1280 # No interface visible to the user is changed 1281 # 1282 # Revision 1.12 2009/05/20 12:35:40 moscicki 1283 # debug directory (https://savannah.cern.ch/bugs/?50305) 1284 # 1285 # Revision 1.11 2009/05/20 09:23:46 moscicki 1286 # debug directory (https://savannah.cern.ch/bugs/?50305) 1287 # 1288 # Revision 1.10 2009/02/24 14:59:34 moscicki 1289 # when removing jobs which are in the "incomplete" or "unknown" status, do not trigger callbacks on application and backend -> they may be missing! 1290 # 1291 # Revision 1.9 2009/02/02 12:54:55 moscicki 1292 # bugfix: bug #45679: j.application.transition_update("removed") is not called on j.remove() 1293 # 1294 # Revision 1.8 2008/11/21 13:45:58 moscicki 1295 # #bug #44256: Job in state "incomplete" is impossible to remove 1296 # 1297 # Revision 1.7 2008/11/07 12:39:53 moscicki 1298 # added j.submit(keep_on_fail=False) option (request #43143) 1299 # 1300 # Revision 1.6 2008/10/02 10:31:05 moscicki 1301 # bugfix #41372: added backend.remove() method to support job removal on the Remote backend 1302 # 1303 # Revision 1.5 2008/09/09 14:51:14 moscicki 1304 # bug #40696: Exception raised during resubmit() should be propagated to caller 1305 # 1306 # Revision 1.4 2008/09/09 14:37:16 moscicki 1307 # bugfix #40220: Ensure that default values satisfy the declared types in the schema 1308 # 1309 # factored out type checking into schema module, fixed a number of wrongly declared schema items in the core 1310 # 1311 # Revision 1.3 2008/08/18 13:18:58 moscicki 1312 # added force_status() method to replace job.fail(), force_job_failed() and 1313 # force_job_completed() 1314 # 1315 # Revision 1.2 2008/08/04 14:28:20 moscicki 1316 # bugfix: #39655: Problem with slice operations on templates 1317 # 1318 # REMOVED TABS 1319 # 1320 # Revision 1.1 2008/07/17 16:40:54 moscicki 1321 # migration of 5.0.2 to HEAD 1322 # 1323 # the doc and release/tools have been taken from HEAD 1324 # 1325 # Revision 1.62.4.17 2008/04/21 08:46:51 wreece 1326 # Imports missing symbol. test Ganga/test/Bugs/Savannah28511 now passes 1327 # 1328 # Revision 1.62.4.16 2008/04/18 13:42:02 moscicki 1329 # remove obsolete printout 1330 # 1331 # Revision 1.62.4.15 2008/04/18 07:06:24 moscicki 1332 # bugfix 13404 (reintroduced in 5) 1333 # 1334 # Revision 1.62.4.14 2008/04/02 11:29:35 moscicki 1335 # inputdir and outputdir are not stored persistently anymore but are calculated wrt to the current workspace configuration 1336 # 1337 # this makes it easier to relocate local repository and, in the future, to implement local workspace cache for remote repository/workspace 1338 # 1339 # Revision 1.62.4.13 2008/03/17 19:38:40 roma 1340 # bug fix #28511 1341 # 1342 # Revision 1.62.4.12 2008/03/06 14:10:35 moscicki 1343 # added warning if fail() fails 1344 # 1345 # Revision 1.62.4.11 2008/03/04 10:23:43 amuraru 1346 # fixed rollbackToNewStatu 1347 # 1348 # Revision 1.62.4.10 2008/03/03 14:57:06 amuraru 1349 # fixed the state transition when merger fails 1350 # 1351 # Revision 1.62.4.9 2008/02/29 10:27:21 moscicki 1352 # fixes in Job._kill() method 1353 # 1354 # added fail() and remove() method in GPI slices (not all keywords implemented yet) 1355 # 1356 # Revision 1.62.4.8 2008/02/28 15:48:11 moscicki 1357 # cannot submit subjobs directly 1358 # 1359 # Revision 1.62.4.7 2008/02/28 13:07:37 moscicki 1360 # added fail() and improved behaviour of kill() 1361 # improved logging messages 1362 # removed race condition in status update hook 1363 # 1364 # Revision 1.62.4.6 2007/12/19 16:27:17 moscicki 1365 # remove(): ignore kill failures 1366 # fixed messaging on kill 1367 # 1368 # Revision 1.62.4.5 2007/12/18 09:06:42 moscicki 1369 # integrated typesystem from Alvin 1370 # 1371 # Revision 1.62.4.4 2007/12/10 17:29:01 amuraru 1372 # merged changes from Ganga 4.4.4 1373 # 1374 # Revision 1.62.4.3 2007/11/13 18:37:26 wreece 1375 # Merges head change in Job with branch. Fixes warnings in Mergers. Merges MergerTests with head. Adds new test to GangaList. Fixes config problems in Root. 1376 # 1377 # Revision 1.62.4.2 2007/11/08 11:56:44 moscicki 1378 # pretty print for subjobs added 1379 # 1380 # Revision 1.62.4.1 2007/11/07 17:02:12 moscicki 1381 # merged against Ganga-4-4-0-dev-branch-kuba-slices with a lot of manual merging 1382 # 1383 # Revision 1.65 2007/12/04 14:11:02 moscicki 1384 # added submitted->submitting transition to fix problems with master job status in case of subjob resubmission 1385 # 1386 # Revision 1.64 2007/10/19 15:24:40 amuraru 1387 # allow *->submitting transition in order to support resubmission (hurng-chun) 1388 # 1389 # Revision 1.63 2007/10/09 13:05:17 moscicki 1390 # JobInfo object from Vladimir 1391 # Savannah #30045 bugfix from Will (merger recursive updateStatus() call) 1392 # 1393 # Revision 1.62 2007/09/12 16:40:46 amuraru 1394 # use the internal logger when in log_user_exception 1395 # 1396 # Revision 1.61 2007/07/27 15:13:38 moscicki 1397 # merged the monitoring services branch from kuba 1398 # 1399 # Revision 1.60 2007/07/27 13:52:00 moscicki 1400 # merger updates from Will (from GangaMergers-1-0 branch) 1401 # 1402 # Revision 1.59 2007/07/10 13:08:30 moscicki 1403 # docstring updates (ganga devdays) 1404 # 1405 # Revision 1.58.2.1 2007/05/14 13:32:11 wreece 1406 # Adds the merger related code on a seperate branch. All tests currently 1407 # run successfully. 1408 # 1409 # Revision 1.58 2007/05/11 13:26:05 moscicki 1410 # fix in the state transition to support: 1411 # 1412 # temporary functions to help getting jobs out of completing and submitting states: 1413 # force_job_completed(j): may be applied to completing jobs 1414 # force_job_failed(j): may be applied to submitting or completing jobs 1415 # 1416 # Revision 1.57.4.2 2007/06/18 14:35:20 moscicki 1417 # mergefrom_HEAD_18Jun07 1418 # 1419 # Revision 1.57.4.1 2007/06/18 10:16:34 moscicki 1420 # slices prototype 1421 # 1422 # Revision 1.58 2007/05/11 13:26:05 moscicki 1423 # fix in the state transition to support: 1424 # 1425 # temporary functions to help getting jobs out of completing and submitting states: 1426 # force_job_completed(j): may be applied to completing jobs 1427 # force_job_failed(j): may be applied to submitting or completing jobs 1428 # 1429 # Revision 1.57 2007/04/20 17:29:34 moscicki 1430 # re-enabled the removal of subjobs table in the repository on job submission failure (revert to new status) 1431 # 1432 # Revision 1.56 2007/03/29 16:59:20 moscicki 1433 # exception handling fixed 1434 # 1435 # Revision 1.55 2007/03/26 16:14:36 moscicki 1436 # - changed formatting of exception messages 1437 # - fix removing the subjobs and reseting the counter when the split job submission fails (Ganga/test/GPI/CrashMultipleSubmitSubjobs.gpi) - TO BE CHECKED! 1438 # 1439 # Revision 1.54 2007/02/28 18:16:56 moscicki 1440 # support for generic: self.application.postprocess() 1441 # 1442 # removed JobManager from resubmit() and kill() 1443 # 1444 # Revision 1.53 2007/02/22 13:46:10 moscicki 1445 # simplification of the internal code: removal of JobManager and ApplicationManager 1446 # 1447 # bugfix 23737 1448 # 1449 # Revision 1.52 2007/01/25 16:18:21 moscicki 1450 # mergefrom_Ganga-4-2-2-bugfix-branch_25Jan07 (GangaBase-4-14) 1451 # 1452 # Revision 1.51 2006/10/26 16:27:24 moscicki 1453 # explicit subjob support (Alexander Soroko) 1454 # 1455 # Revision 1.50.2.3 2006/12/14 18:21:16 kuba 1456 # monitoring hook for job.submit() 1457 # 1458 # Revision 1.50.2.2 2006/11/24 14:31:39 amuraru 1459 # implementation of peek() function 1460 # 1461 # Revision 1.50.2.1 2006/11/02 09:27:10 amuraru 1462 # Fixed [bug #21225] 1463 # 1464 # Revision 1.50 2006/10/22 22:31:54 adim 1465 # allow manual subjob submmission (relaxed the checks in Job.submit() changing the assertion with a warning) 1466 # 1467 # Revision 1.49 2006/10/19 12:34:18 adim 1468 # *** empty log message *** 1469 # 1470 # Revision 1.48 2006/10/19 12:33:35 adim 1471 # *** empty log message *** 1472 # 1473 # Revision 1.47 2006/10/19 12:05:32 adim 1474 # allow manual subjob submmission (relaxed the checks in Job.submit() changing the assertion with a warning) 1475 # 1476 # Revision 1.46 2006/10/03 10:41:27 moscicki 1477 # log user exceptions in job state transitions 1478 # 1479 # Revision 1.45 2006/10/02 14:48:49 moscicki 1480 # make a difference between master and sub packed input sandbox in case there is not splitting (they were overriding each other) 1481 # 1482 # Revision 1.44 2006/09/19 09:40:33 adim 1483 # Bug fix #17080 1484 # 1485 # Revision 1.43 2006/09/08 13:03:08 adim 1486 # Fixed Bug#19434 1487 # Added a rollback hook for the submitting->new transition (activated when 1488 # submission fails) which cleans up the Input and Output workspaces. 1489 # 1490 # Revision 1.42 2006/08/29 12:54:02 moscicki 1491 # trivial fix 1492 # 1493 # Revision 1.41 2006/08/29 12:03:26 moscicki 1494 # - createInputSandbox() 1495 # - fixes in resubmit() 1496 # 1497 # Revision 1.40 2006/08/24 16:58:26 moscicki 1498 # added createPackedInputSandbox() 1499 # 1500 # Revision 1.39 2006/08/11 13:35:10 moscicki 1501 # preliminary resubmit implementation 1502 # 1503 # Revision 1.38 2006/08/02 08:28:29 moscicki 1504 # id of subjobs starts with 0 not 1 1505 # 1506 # Revision 1.37 2006/08/01 09:28:57 moscicki 1507 # updated state list 1508 # 1509 # Revision 1.36 2006/07/31 12:15:43 moscicki 1510 # updateMasterJobStatus() helper method for bulk subjob monitoring 1511 # more transitions from submitted added 1512 # 1513 # Revision 1.35 2006/07/28 15:01:32 moscicki 1514 # small bugfix 1515 # 1516 # Revision 1.34 2006/07/28 12:52:37 moscicki 1517 # default self transitions enabled (s->s) 1518 # improved FQID (removed broken caching) 1519 # use FQID for logging messages 1520 # comments 1521 # 1522 # Revision 1.33 2006/07/27 20:13:46 moscicki 1523 # - JobStatusError 1524 # - added simple job state machine 1525 # - status has "checkset" metaproperty 1526 # - updateStatus() 1527 # - postprocessing_hook() 1528 # - getInputWorkspace(), getOutputWorkspace() 1529 # - getFQID() 1530 # - changed subjob ids (short numbers) 1531 # - fixed commit (suboptimal) 1532 # - changes to exception handling 1533 # 1534 # Revision 1.32 2006/07/10 13:24:41 moscicki 1535 # changes from Johannes: outputdata handling 1536 # 1537 # exception fixes... 1538 # 1539 # Revision 1.31 2006/06/13 12:34:29 moscicki 1540 # _category = "mergers" # plural -> following other category names 1541 # 1542 # Job.merge(self, sum_outputdir = None , subjobs = None, **options) 1543 # -> see the docstring for more details (logfile is passed in options) 1544 # 1545 # Revision 1.30 2006/06/13 08:50:04 moscicki 1546 # Added merger 1547 # 1548 # Revision 1.29 2006/06/09 14:31:51 moscicki 1549 # exception fix 1550 # 1551 # Revision 1.28 2006/02/13 15:19:14 moscicki 1552 # support for two-phase confguration (...,master config, splitting, sub config,...) 1553 # 1554 # Revision 1.27 2006/02/10 14:23:13 moscicki 1555 # fixed: bug #14524 overview: jobs[ id ].remove() doesn't delete top level job directory 1556 # 1557 # Revision 1.26 2005/12/08 12:01:03 moscicki 1558 # _init_workspace() method (this is a temporary name) 1559 # inputdir/outputdir of subjobs now point to the real directories (TODO: which are still not in a correct place in the filesystem) 1560 # 1561 # Revision 1.25 2005/12/02 15:30:35 moscicki 1562 # schema changes: master, subjobs, splitter properties 1563 # splitting support 1564 # customizable _repr() method 1565 # 1566 # Revision 1.24 2005/11/14 10:34:16 moscicki 1567 # added running state, GUI prefs 1568 # 1569 # 1570 # Revision 1.23 2005/10/21 13:17:56 moscicki 1571 # bufix #12475 (killing job in a running state) 1572 # 1573 # Revision 1.22.2.1 2005/11/04 11:40:12 ctan 1574 # *** empty log message *** 1575 # 1576 # Revision 1.22 2005/09/23 09:30:17 moscicki 1577 # minor 1578 # 1579 # Revision 1.21 2005/09/19 10:57:31 asaroka 1580 # Restriction on job name format is removed as redundant. 1581 # 1582 # Revision 1.20 2005/08/29 10:01:36 moscicki 1583 # added docs 1584 # 1585 # Revision 1.19 2005/08/26 09:55:49 moscicki 1586 # outputsandbox property, many comments added 1587 # 1588 # Revision 1.18 2005/08/24 15:33:50 moscicki 1589 # added docstrings 1590 # 1591 # Revision 1.17 2005/08/23 17:09:27 moscicki 1592 # minor changes 1593 # 1594 # 1595 # 1596