Home | Trees | Indices | Help |
---|
|
1 2 from common import * 3 from sets import Set 4 from TaskApplication import ExecutableTask, taskApp 5 from Ganga.GPIDev.Lib.Job.Job import JobError 6 from Ganga.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy 79 _schema = Schema(Version(1,0), { 10 'status' : SimpleItem(defvalue='new', protected=1, copyable=0, doc='Status - running, pause or completed', typelist=["str"]), 11 'name' : SimpleItem(defvalue='Simple Transform', doc='Name of the transform (cosmetic)', typelist=["str"]), 12 'application' : ComponentItem('applications', defvalue=None, optional=1, load_default=False, filter="checkTaskApplication",doc='Application of the Transform. Must be a Task-Supporting application.'), 13 'inputsandbox' : FileItem(defvalue=[],typelist=['str','Ganga.GPIDev.Lib.File.File.File'],sequence=1,doc="list of File objects shipped to the worker node "), 14 'outputsandbox' : SimpleItem(defvalue=[],typelist=['str'],sequence=1,doc="list of filenames or patterns shipped from the worker node"), 15 'inputdata' : ComponentItem('datasets', defvalue=None, optional=1, load_default=False,doc='Input dataset'), 16 'outputdata' : ComponentItem('datasets', defvalue=None, optional=1, load_default=False,doc='Output dataset'), 17 'backend' : ComponentItem('backends', defvalue=None, optional=1,load_default=False, doc='Backend of the Transform.'), 18 'run_limit' : SimpleItem(defvalue=4, doc='Number of times a partition is tried to be processed.', protected=1, typelist=["int"]), 19 '_partition_status': SimpleItem(defvalue={}, hidden=1, copyable=0, doc='Map (only necessary) partitions to their status'), 20 '_app_partition' : SimpleItem(defvalue={}, hidden=1, copyable=0,doc='Map of applications to partitions'), 21 '_app_status' : SimpleItem(defvalue={}, hidden=1, copyable=0, doc='Map of applications to status'), 22 '_next_app_id' : SimpleItem(defvalue=0, hidden=1, copyable=0, doc='Next ID used for the application', typelist=["int"]), 23 }) 24 25 _category = 'transforms' 26 _name = 'Transform' 27 _exportmethods = [ 28 'run', 'pause', # Operations 29 'setPartitionStatus', 'setRunlimit', 'setFailed', # Control Partitions 30 'getPartitionStatus', 'getJobs', 'getPartitionJobs', 31 'overview', 'info', 'n_all', 'n_status', 'retryFailed' # Info 32 ] 33 34 # _app_status = {} 35 _partition_apps = None 36 37 # possible partition status values: 38 # ignored, hold, ready, running, completed, attempted, failed, bad 39 40 ## Special methods: 4420346 """A transform is read-only if the status is not new.""" 47 if self.status == "new": 48 return 0 49 return 150 53 5658 """This function is used to set the status after restarting Ganga""" 59 # Make sure that no partitions are kept "running" from previous sessions 60 clist = self._partition_status.keys() 61 for c in clist: 62 self.updatePartitionStatus(c) 63 # At this point the applications still need to notify the Transformation of their status 64 # Search jobs for task-supporting applications 65 id = "%i:%i"%(self._getParent().id,self._getParent().transforms.index(self)) 66 for j in GPI.jobs: 67 if "tasks_id" in j.application._impl._data: 68 #print "tasks_id of jobid ", j.fqid, j.application._impl._data["tasks_id"], id 69 if j.application._impl._data["tasks_id"].endswith(id): 70 try: 71 if j.subjobs: 72 for sj in j.subjobs: 73 app = sj.application._impl 74 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 75 else: 76 app = j.application._impl 77 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 78 except AttributeError, e: 79 logger.debug("%s",e) 80 pass8183 if self._partition_apps is None: 84 ## Create the reverse map _partition_apps from _app_partition 85 self._partition_apps = {} 86 for (app, partition) in self._app_partition.iteritems(): 87 if partition in self._partition_apps: 88 if not app in self._partition_apps[partition]: 89 self._partition_apps[partition].append(app) 90 else: 91 self._partition_apps[partition] = [app] 92 return self._partition_apps9395 """This function fixes inconsistencies in application status""" 96 ## Create the reverse map _partition_apps from _app_partition 97 self._app_status = {} 98 # Make sure that no partitions are kept "running" from previous sessions 99 clist = self._partition_status.keys() 100 for c in clist: 101 self.updatePartitionStatus(c) 102 # At this point the applications still need to notify the Transformation of their status 103 # Search jobs for task-supporting applications 104 105 id = "%i:%i"%(self._getParent().id,self._getParent().transforms.index(self)) 106 for j in GPI.jobs: 107 if "tasks_id" in j.application._impl._data: 108 if j.application._impl._data["tasks_id"] == id: 109 try: 110 if j.subjobs: 111 for sj in j.subjobs: 112 app = sj.application._impl 113 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 114 else: 115 app = j.application._impl 116 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 117 except AttributeError, e: 118 logger.error("%s",e)119 120 121 ## Public methods123 """Sets this transform to running status""" 124 if self.status == "new" and check: 125 self.check() 126 if self.status != "completed": 127 self.updateStatus("running") 128 #self.status = "running" 129 # Check if this transform has completed in the meantime 130 is_complete = True 131 for s in self._partition_status.values(): 132 if s != "completed" and s != "bad": 133 is_complete = False 134 break 135 if is_complete: 136 self.updateStatus("completed") 137 #self.status = "completed" 138 task = self._getParent() 139 if task: 140 task.updateStatus() 141 else: 142 logger.warning("Transform is already completed!")143145 """Pause the task - the background thread will not submit new jobs from this task""" 146 if self.status != "completed": 147 self.updateStatus("pause") 148 #self.status = "pause" 149 task = self._getParent() 150 if task: 151 task.updateStatus() 152 else: 153 logger.debug("Transform is already completed!")154156 """Set the number of times a job should be resubmitted before the transform is paused""" 157 self.run_limit = newRL 158 cs = self._partition_status.items() 159 for (c,s) in cs: 160 if s in ["attempted","failed"]: 161 failures = self.getPartitionFailures(c) 162 if failures >= newRL: 163 self._partition_status[c] = "failed" 164 else: 165 self._partition_status[c] = "attempted" 166 logger.debug("Runlimit set to %i", newRL)167169 """ Set the Status of the given partition to "ready", "hold", "bad" or "completed". 170 The status is then updated to the status indicated by the applications""" 171 self.setPartitionsStatus([partition],status)172 176178 """ Get the job slice that processed the given partition. Iterates over the job list. """ 179 task = self._getParent() 180 id = task.transforms.index(self) 181 if partition is None: 182 sname = "tasks(%i).transforms[%i].getJobs()"%(task.id,id) 183 else: 184 sname = "tasks(%i).transforms[%i].getPartitionJobs(%s)"%(task.id,id,partition) 185 jobslice = JobRegistrySlice(sname) 186 def addjob(j): 187 if partition is None or self._app_partition[j.application.id] == partition: 188 jobslice.objects[j.fqid] = stripProxy(j)189 190 for j in GPI.jobs: 191 try: 192 stid = j.application.tasks_id.split(":") 193 if int(stid[-2]) == task.id and int(stid[-1]) == id: 194 if j.subjobs: 195 for sj in j.subjobs: 196 addjob(sj) 197 else: 198 addjob(j) 199 except Exception, x: 200 #print x 201 pass 202 return JobRegistrySliceProxy(jobslice)205 """ Tells Tasks that all Applications that have executed this partition have actually failed.""" 206 for aid in self._app_partition: 207 if aid in self._app_status and self._app_status[aid] == "removed": 208 continue 209 # Save the status 210 self._app_status[aid] = "failed" 211 # Update the corresponding partition status 212 self.setPartitionStatus(partition, "ready")213215 """Retry all failed partitions (forget about failed jobs)""" 216 for aid in self._app_partition: 217 if aid in self._app_status and self._app_status[aid] == "failed": 218 self._app_status[aid] = "removed" 219 clist = self._partition_status.keys() 220 for c in clist: 221 self.updatePartitionStatus(c)222 223 ## Internal methods 227229 """Create Ganga Jobs for the next N partitions that are ready and submit them.""" 230 next = self.getNextPartitions(n) 231 if len(next) == 0: 232 return 0 233 numjobs = 0 234 for j in self.getJobsForPartitions(next): 235 j.application._impl.transition_update("submitting") 236 try: 237 j.submit() 238 except JobError: 239 logger.error("Error on job submission! The current transform will be paused until this problem is fixed.") 240 logger.error("type tasks(%i).run() to continue after the problem has been fixed.", self._getParent().id) 241 self.pause() 242 numjobs += 1 243 return numjobs244246 """warns the user if the application is not compatible """ 247 if app == None: 248 return None 249 if not "tasks_id" in stripProxy(app)._data: 250 return taskApp(app) 251 return app252254 """Reports status changes in application jobs 255 possible status values: 256 normal : (new, submitting,) submitted, running, completing, completed 257 failures : killed, failed 258 transient: incomplete (->new), unknown, removed""" 259 260 # Check if we know the occurring application... 261 if app.id == -1: 262 return 263 if not app.id in self._app_partition: 264 logger.warning("%s was contacted by an unknown application %i.", self.fqn(), app.id) 265 return 266 # Silently ignore message if the application is already removed or completed 267 if app.id in self._app_status and self._app_status[app.id] in ["removed","completed","failed"]: 268 return 269 # Check the status 270 if new_status == "completed" and not self.checkCompletedApp(app): 271 logger.error("%s app %i failed despite listed as completed!",self.fqn(), app.id) 272 new_status = "failed" 273 # Save the status 274 self._app_status[app.id] = new_status 275 # Update the corresponding partition status 276 self.updatePartitionStatus(self._app_partition[app.id])277279 """ Calculate the correct status of the given partition. 280 "completed" and "bad" is never changed here 281 "hold" is only changed to "completed" here. """ 282 #print "updatePartitionStatus ", partition, " transform ", self.id 283 ## If the partition has status, and is not in a fixed state, check it! 284 285 if partition in self._partition_status and (not self._partition_status[partition] in ["bad","completed"]): 286 ## if we have no applications, we are in "ready" state 287 if not partition in self.getPartitionApps(): 288 if self._partition_status[partition] != "hold": 289 self._partition_status[partition] = "ready" 290 else: 291 status = [self._app_status[app] for app in self.getPartitionApps()[partition] 292 if app in self._app_status and not self._app_status[app] in ["removed","killed"]] 293 ## Check if we have completed this partition 294 if "completed" in status: 295 self._partition_status[partition] = "completed" 296 ## Check if we are not on hold 297 elif self._partition_status[partition] != "hold": 298 ## Check if we are running 299 running = False 300 for stat in ["completing", "running", "submitted", "submitting"]: 301 if stat in status: 302 self._partition_status[partition] = "running" 303 running = True 304 break 305 if not running: 306 ## Check if we failed 307 #failures = len([stat for stat in status if stat in ["failed","new"]]) 308 failures = self.getPartitionFailures(partition) 309 310 if failures >= self.run_limit: 311 self._partition_status[partition] = "failed" 312 elif failures > 0: 313 self._partition_status[partition] = "attempted" 314 else: 315 ## Here we only have some "unknown" applications 316 ## This could prove difficult when launching new applications. Care has to be taken 317 ## to get the applications out of "unknown" stats as quickly as possible, to avoid double submissions. 318 #logger.warning("Partition with only unknown applications encountered. This is probably not a problem.") 319 self._partition_status[partition] = "ready" 320 ## Notify the next transform (if any) of the change in input status 321 self.notifyNextTransform(partition) 322 323 ## Update the Tasks status if necessary 324 task = self._getParent() 325 if partition in self._partition_status and self._partition_status[partition] in ["completed","bad"] and self.status == "running": 326 for s in self._partition_status.values(): 327 if s != "completed" and s != "bad": 328 return 329 #self.status = "completed" 330 self.updateStatus("completed") 331 if task: 332 task.updateStatus() 333 elif self.status == "completed": 334 for s in self._partition_status.values(): 335 if s != "completed" and s != "bad": 336 self.updateStatus("running") 337 #self.status = "running" 338 if task: 339 task.updateStatus() 340 return341343 """ Notify any dependant transforms of the input update """ 344 task = self._getParent() 345 if task and (task.transforms.index(self) + 1 < len(task.transforms)): 346 task.transforms[task.transforms.index(self) + 1].updateInputStatus(self, partition)347349 """ Set the Status of the partitions to "ready", "hold", "bad" or "completed". 350 The status is then updated to the status indicated by the applications 351 "bad" and "completed" is never changed except to "ignored", "hold" is only changed to "completed". """ 352 if status == "ignored": 353 [self._partition_status.pop(c) for c in partitions if c in self._partition_status] 354 elif status in ["ready","hold", "bad", "completed"]: 355 for c in partitions: 356 self._partition_status[c] = status 357 else: 358 logger.error("setPartitionsStatus called with invalid status string %s", status) 359 for c in partitions: 360 self.updatePartitionStatus(c)361363 """ Set all partitions from and including limitpartition to ignored """ 364 partitions = [c for c in self._partition_status if c >= limitpartition] 365 self.setPartitionsStatus(partitions,"ignored")366368 if partition in self._partition_status: 369 return self._partition_status[partition] 370 else: 371 return "ignored"372374 """Returns the N next partitions to process""" 375 partitionlist = [c for c in self._partition_status if self._partition_status[c] in ["ready","attempted"]] 376 partitionlist.sort() 377 return partitionlist[:n]378380 """ Returns a new application ID and associates this ID with the partition given. """ 381 id = self._next_app_id 382 self._app_partition[id] = partition 383 if partition in self.getPartitionApps(): 384 self.getPartitionApps()[partition].append(id) 385 else: 386 self.getPartitionApps()[partition] = [id] 387 self._next_app_id += 1 388 return id389391 """ Returns a new job initialized with the transforms application, backend and name """ 392 task = self._getParent() # this works because createNewJob is only called by a task 393 id = task.transforms.index(self) 394 j = GPI.Job() 395 j._impl.backend = self.backend.clone() 396 j._impl.application = self.application.clone() 397 j._impl.application.tasks_id = "%i:%i" % (task.id, id) 398 j._impl.application.id = self.getNewAppID(partition) 399 j.inputdata = self.inputdata 400 j.outputdata = self.outputdata 401 j.inputsandbox = self.inputsandbox 402 j.outputsandbox = self.outputsandbox 403 j.name = "T%i:%i C%i" % (task.id, id, partition) 404 return j405 406 ## Methods that can/should be overridden by derived classes 410412 """Is called my the last transform (ltf) if the partition 'partition' changes status""" 413 # per default no dependencies exist 414 pass415417 """This is only an example, this class should be overridden by derived classes""" 418 return [self.createNewJob(p) for p in partitions]419 420 ## Information methods422 task = self._getParent() 423 if task: 424 return "Task %i Transform %i" % (task.id, task.transforms.index(self)) 425 else: 426 return "Unassigned Transform '%s'" % (self.name)427429 return len(self._partition_status)430 433435 """ Get an ascii art overview over task status. Can be overridden """ 436 task = self._getParent() 437 if not task is None: 438 id = str(task.transforms.index(self)) 439 else: 440 id = "?" 441 o = markup("#%s: %s '%s'\n" % (id, self.__class__.__name__, self.name), status_colours[self.status]) 442 i = 0 443 partitions = self._partition_status.keys() 444 partitions.sort() 445 for c in partitions: 446 s = self._partition_status[c] 447 if c in self.getPartitionApps(): 448 failures = self.getPartitionFailures(c) 449 o += markup("%i:%i " % (c, failures), overview_colours[s]) 450 else: 451 o += markup("%i " % c, overview_colours[s]) 452 i+=1 453 if i % 20 == 0: o+="\n" 454 print o455457 print markup("%s '%s'" % (self.__class__.__name__, self.name), status_colours[self.status]) 458 print "* backend: %s" % self.backend.__class__.__name__ 459 print "Application:" 460 self.application.printTree()461463 """Return the number of failures for this partition""" 464 return len([1 for app in self.getPartitionApps()[partition] if app in self._app_status and self._app_status[app] in ["new","failed"]])465 469
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Mon Jun 25 10:35:26 2012 | http://epydoc.sourceforge.net |