Package Ganga :: Package GPIDev :: Package Lib :: Package Tasks :: Module Transform'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.GPIDev.Lib.Tasks.Transform'

  1   
  2  from common import * 
  3  from sets import Set 
  4  from TaskApplication import ExecutableTask, taskApp 
  5  from Ganga.GPIDev.Lib.Job.Job import JobError 
  6  from Ganga.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy 
  7   
8 -class Transform(GangaObject):
9 _schema = Schema(Version(1,0), { 10 'status' : SimpleItem(defvalue='new', protected=1, copyable=0, doc='Status - running, pause or completed', typelist=["str"]), 11 'name' : SimpleItem(defvalue='Simple Transform', doc='Name of the transform (cosmetic)', typelist=["str"]), 12 'application' : ComponentItem('applications', defvalue=None, optional=1, load_default=False, filter="checkTaskApplication",doc='Application of the Transform. Must be a Task-Supporting application.'), 13 'inputsandbox' : FileItem(defvalue=[],typelist=['str','Ganga.GPIDev.Lib.File.File.File'],sequence=1,doc="list of File objects shipped to the worker node "), 14 'outputsandbox' : SimpleItem(defvalue=[],typelist=['str'],sequence=1,doc="list of filenames or patterns shipped from the worker node"), 15 'inputdata' : ComponentItem('datasets', defvalue=None, optional=1, load_default=False,doc='Input dataset'), 16 'outputdata' : ComponentItem('datasets', defvalue=None, optional=1, load_default=False,doc='Output dataset'), 17 'backend' : ComponentItem('backends', defvalue=None, optional=1,load_default=False, doc='Backend of the Transform.'), 18 'run_limit' : SimpleItem(defvalue=4, doc='Number of times a partition is tried to be processed.', protected=1, typelist=["int"]), 19 '_partition_status': SimpleItem(defvalue={}, hidden=1, copyable=0, doc='Map (only necessary) partitions to their status'), 20 '_app_partition' : SimpleItem(defvalue={}, hidden=1, copyable=0,doc='Map of applications to partitions'), 21 '_app_status' : SimpleItem(defvalue={}, hidden=1, copyable=0, doc='Map of applications to status'), 22 '_next_app_id' : SimpleItem(defvalue=0, hidden=1, copyable=0, doc='Next ID used for the application', typelist=["int"]), 23 }) 24 25 _category = 'transforms' 26 _name = 'Transform' 27 _exportmethods = [ 28 'run', 'pause', # Operations 29 'setPartitionStatus', 'setRunlimit', 'setFailed', # Control Partitions 30 'getPartitionStatus', 'getJobs', 'getPartitionJobs', 31 'overview', 'info', 'n_all', 'n_status', 'retryFailed' # Info 32 ] 33 34 # _app_status = {} 35 _partition_apps = None 36 37 # possible partition status values: 38 # ignored, hold, ready, running, completed, attempted, failed, bad 39 40 ## Special methods:
41 - def __init__(self):
42 super(Transform,self).__init__() 43 self.initialize()
44
45 - def _readonly(self):
46 """A transform is read-only if the status is not new.""" 47 if self.status == "new": 48 return 0 49 return 1
50
51 - def initialize(self):
52 self.backend = stripProxy(GPI.Local())
53
54 - def check(self):
55 pass
56
57 - def startup(self):
58 """This function is used to set the status after restarting Ganga""" 59 # Make sure that no partitions are kept "running" from previous sessions 60 clist = self._partition_status.keys() 61 for c in clist: 62 self.updatePartitionStatus(c) 63 # At this point the applications still need to notify the Transformation of their status 64 # Search jobs for task-supporting applications 65 id = "%i:%i"%(self._getParent().id,self._getParent().transforms.index(self)) 66 for j in GPI.jobs: 67 if "tasks_id" in j.application._impl._data: 68 #print "tasks_id of jobid ", j.fqid, j.application._impl._data["tasks_id"], id 69 if j.application._impl._data["tasks_id"].endswith(id): 70 try: 71 if j.subjobs: 72 for sj in j.subjobs: 73 app = sj.application._impl 74 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 75 else: 76 app = j.application._impl 77 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 78 except AttributeError, e: 79 logger.debug("%s",e) 80 pass
81
82 - def getPartitionApps(self):
83 if self._partition_apps is None: 84 ## Create the reverse map _partition_apps from _app_partition 85 self._partition_apps = {} 86 for (app, partition) in self._app_partition.iteritems(): 87 if partition in self._partition_apps: 88 if not app in self._partition_apps[partition]: 89 self._partition_apps[partition].append(app) 90 else: 91 self._partition_apps[partition] = [app] 92 return self._partition_apps
93
94 - def fix(self):
95 """This function fixes inconsistencies in application status""" 96 ## Create the reverse map _partition_apps from _app_partition 97 self._app_status = {} 98 # Make sure that no partitions are kept "running" from previous sessions 99 clist = self._partition_status.keys() 100 for c in clist: 101 self.updatePartitionStatus(c) 102 # At this point the applications still need to notify the Transformation of their status 103 # Search jobs for task-supporting applications 104 105 id = "%i:%i"%(self._getParent().id,self._getParent().transforms.index(self)) 106 for j in GPI.jobs: 107 if "tasks_id" in j.application._impl._data: 108 if j.application._impl._data["tasks_id"] == id: 109 try: 110 if j.subjobs: 111 for sj in j.subjobs: 112 app = sj.application._impl 113 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 114 else: 115 app = j.application._impl 116 app.getTransform()._impl.setAppStatus(app,app._getParent().status) 117 except AttributeError, e: 118 logger.error("%s",e)
119 120 121 ## Public methods
122 - def run(self, check=True):
123 """Sets this transform to running status""" 124 if self.status == "new" and check: 125 self.check() 126 if self.status != "completed": 127 self.updateStatus("running") 128 #self.status = "running" 129 # Check if this transform has completed in the meantime 130 is_complete = True 131 for s in self._partition_status.values(): 132 if s != "completed" and s != "bad": 133 is_complete = False 134 break 135 if is_complete: 136 self.updateStatus("completed") 137 #self.status = "completed" 138 task = self._getParent() 139 if task: 140 task.updateStatus() 141 else: 142 logger.warning("Transform is already completed!")
143
144 - def pause(self):
145 """Pause the task - the background thread will not submit new jobs from this task""" 146 if self.status != "completed": 147 self.updateStatus("pause") 148 #self.status = "pause" 149 task = self._getParent() 150 if task: 151 task.updateStatus() 152 else: 153 logger.debug("Transform is already completed!")
154
155 - def setRunlimit(self,newRL):
156 """Set the number of times a job should be resubmitted before the transform is paused""" 157 self.run_limit = newRL 158 cs = self._partition_status.items() 159 for (c,s) in cs: 160 if s in ["attempted","failed"]: 161 failures = self.getPartitionFailures(c) 162 if failures >= newRL: 163 self._partition_status[c] = "failed" 164 else: 165 self._partition_status[c] = "attempted" 166 logger.debug("Runlimit set to %i", newRL)
167
168 - def setPartitionStatus(self, partition, status):
169 """ Set the Status of the given partition to "ready", "hold", "bad" or "completed". 170 The status is then updated to the status indicated by the applications""" 171 self.setPartitionsStatus([partition],status)
172
173 - def getJobs(self):
174 """ Get the job slice of all jobs for this transform """ 175 return self.getPartitionJobs(None)
176
177 - def getPartitionJobs(self, partition):
178 """ Get the job slice that processed the given partition. Iterates over the job list. """ 179 task = self._getParent() 180 id = task.transforms.index(self) 181 if partition is None: 182 sname = "tasks(%i).transforms[%i].getJobs()"%(task.id,id) 183 else: 184 sname = "tasks(%i).transforms[%i].getPartitionJobs(%s)"%(task.id,id,partition) 185 jobslice = JobRegistrySlice(sname) 186 def addjob(j): 187 if partition is None or self._app_partition[j.application.id] == partition: 188 jobslice.objects[j.fqid] = stripProxy(j)
189 190 for j in GPI.jobs: 191 try: 192 stid = j.application.tasks_id.split(":") 193 if int(stid[-2]) == task.id and int(stid[-1]) == id: 194 if j.subjobs: 195 for sj in j.subjobs: 196 addjob(sj) 197 else: 198 addjob(j) 199 except Exception, x: 200 #print x 201 pass 202 return JobRegistrySliceProxy(jobslice)
203
204 - def setFailed(self, partition):
205 """ Tells Tasks that all Applications that have executed this partition have actually failed.""" 206 for aid in self._app_partition: 207 if aid in self._app_status and self._app_status[aid] == "removed": 208 continue 209 # Save the status 210 self._app_status[aid] = "failed" 211 # Update the corresponding partition status 212 self.setPartitionStatus(partition, "ready")
213
214 - def retryFailed(self):
215 """Retry all failed partitions (forget about failed jobs)""" 216 for aid in self._app_partition: 217 if aid in self._app_status and self._app_status[aid] == "failed": 218 self._app_status[aid] = "removed" 219 clist = self._partition_status.keys() 220 for c in clist: 221 self.updatePartitionStatus(c)
222 223 ## Internal methods
224 - def finalise(self):
225 """Finalise the transform - no-op by default""" 226 return
227
228 - def submitJobs(self, n):
229 """Create Ganga Jobs for the next N partitions that are ready and submit them.""" 230 next = self.getNextPartitions(n) 231 if len(next) == 0: 232 return 0 233 numjobs = 0 234 for j in self.getJobsForPartitions(next): 235 j.application._impl.transition_update("submitting") 236 try: 237 j.submit() 238 except JobError: 239 logger.error("Error on job submission! The current transform will be paused until this problem is fixed.") 240 logger.error("type tasks(%i).run() to continue after the problem has been fixed.", self._getParent().id) 241 self.pause() 242 numjobs += 1 243 return numjobs
244
245 - def checkTaskApplication(self,app):
246 """warns the user if the application is not compatible """ 247 if app == None: 248 return None 249 if not "tasks_id" in stripProxy(app)._data: 250 return taskApp(app) 251 return app
252
253 - def setAppStatus(self, app, new_status):
254 """Reports status changes in application jobs 255 possible status values: 256 normal : (new, submitting,) submitted, running, completing, completed 257 failures : killed, failed 258 transient: incomplete (->new), unknown, removed""" 259 260 # Check if we know the occurring application... 261 if app.id == -1: 262 return 263 if not app.id in self._app_partition: 264 logger.warning("%s was contacted by an unknown application %i.", self.fqn(), app.id) 265 return 266 # Silently ignore message if the application is already removed or completed 267 if app.id in self._app_status and self._app_status[app.id] in ["removed","completed","failed"]: 268 return 269 # Check the status 270 if new_status == "completed" and not self.checkCompletedApp(app): 271 logger.error("%s app %i failed despite listed as completed!",self.fqn(), app.id) 272 new_status = "failed" 273 # Save the status 274 self._app_status[app.id] = new_status 275 # Update the corresponding partition status 276 self.updatePartitionStatus(self._app_partition[app.id])
277
278 - def updatePartitionStatus(self, partition):
279 """ Calculate the correct status of the given partition. 280 "completed" and "bad" is never changed here 281 "hold" is only changed to "completed" here. """ 282 #print "updatePartitionStatus ", partition, " transform ", self.id 283 ## If the partition has status, and is not in a fixed state, check it! 284 285 if partition in self._partition_status and (not self._partition_status[partition] in ["bad","completed"]): 286 ## if we have no applications, we are in "ready" state 287 if not partition in self.getPartitionApps(): 288 if self._partition_status[partition] != "hold": 289 self._partition_status[partition] = "ready" 290 else: 291 status = [self._app_status[app] for app in self.getPartitionApps()[partition] 292 if app in self._app_status and not self._app_status[app] in ["removed","killed"]] 293 ## Check if we have completed this partition 294 if "completed" in status: 295 self._partition_status[partition] = "completed" 296 ## Check if we are not on hold 297 elif self._partition_status[partition] != "hold": 298 ## Check if we are running 299 running = False 300 for stat in ["completing", "running", "submitted", "submitting"]: 301 if stat in status: 302 self._partition_status[partition] = "running" 303 running = True 304 break 305 if not running: 306 ## Check if we failed 307 #failures = len([stat for stat in status if stat in ["failed","new"]]) 308 failures = self.getPartitionFailures(partition) 309 310 if failures >= self.run_limit: 311 self._partition_status[partition] = "failed" 312 elif failures > 0: 313 self._partition_status[partition] = "attempted" 314 else: 315 ## Here we only have some "unknown" applications 316 ## This could prove difficult when launching new applications. Care has to be taken 317 ## to get the applications out of "unknown" stats as quickly as possible, to avoid double submissions. 318 #logger.warning("Partition with only unknown applications encountered. This is probably not a problem.") 319 self._partition_status[partition] = "ready" 320 ## Notify the next transform (if any) of the change in input status 321 self.notifyNextTransform(partition) 322 323 ## Update the Tasks status if necessary 324 task = self._getParent() 325 if partition in self._partition_status and self._partition_status[partition] in ["completed","bad"] and self.status == "running": 326 for s in self._partition_status.values(): 327 if s != "completed" and s != "bad": 328 return 329 #self.status = "completed" 330 self.updateStatus("completed") 331 if task: 332 task.updateStatus() 333 elif self.status == "completed": 334 for s in self._partition_status.values(): 335 if s != "completed" and s != "bad": 336 self.updateStatus("running") 337 #self.status = "running" 338 if task: 339 task.updateStatus() 340 return
341
342 - def notifyNextTransform(self, partition):
343 """ Notify any dependant transforms of the input update """ 344 task = self._getParent() 345 if task and (task.transforms.index(self) + 1 < len(task.transforms)): 346 task.transforms[task.transforms.index(self) + 1].updateInputStatus(self, partition)
347
348 - def setPartitionsStatus(self, partitions, status):
349 """ Set the Status of the partitions to "ready", "hold", "bad" or "completed". 350 The status is then updated to the status indicated by the applications 351 "bad" and "completed" is never changed except to "ignored", "hold" is only changed to "completed". """ 352 if status == "ignored": 353 [self._partition_status.pop(c) for c in partitions if c in self._partition_status] 354 elif status in ["ready","hold", "bad", "completed"]: 355 for c in partitions: 356 self._partition_status[c] = status 357 else: 358 logger.error("setPartitionsStatus called with invalid status string %s", status) 359 for c in partitions: 360 self.updatePartitionStatus(c)
361
362 - def setPartitionsLimit(self, limitpartition):
363 """ Set all partitions from and including limitpartition to ignored """ 364 partitions = [c for c in self._partition_status if c >= limitpartition] 365 self.setPartitionsStatus(partitions,"ignored")
366
367 - def getPartitionStatus(self, partition):
368 if partition in self._partition_status: 369 return self._partition_status[partition] 370 else: 371 return "ignored"
372
373 - def getNextPartitions(self, n):
374 """Returns the N next partitions to process""" 375 partitionlist = [c for c in self._partition_status if self._partition_status[c] in ["ready","attempted"]] 376 partitionlist.sort() 377 return partitionlist[:n]
378
379 - def getNewAppID(self, partition):
380 """ Returns a new application ID and associates this ID with the partition given. """ 381 id = self._next_app_id 382 self._app_partition[id] = partition 383 if partition in self.getPartitionApps(): 384 self.getPartitionApps()[partition].append(id) 385 else: 386 self.getPartitionApps()[partition] = [id] 387 self._next_app_id += 1 388 return id
389
390 - def createNewJob(self, partition):
391 """ Returns a new job initialized with the transforms application, backend and name """ 392 task = self._getParent() # this works because createNewJob is only called by a task 393 id = task.transforms.index(self) 394 j = GPI.Job() 395 j._impl.backend = self.backend.clone() 396 j._impl.application = self.application.clone() 397 j._impl.application.tasks_id = "%i:%i" % (task.id, id) 398 j._impl.application.id = self.getNewAppID(partition) 399 j.inputdata = self.inputdata 400 j.outputdata = self.outputdata 401 j.inputsandbox = self.inputsandbox 402 j.outputsandbox = self.outputsandbox 403 j.name = "T%i:%i C%i" % (task.id, id, partition) 404 return j
405 406 ## Methods that can/should be overridden by derived classes
407 - def checkCompletedApp(self, app):
408 """Can be overriden to improve application completeness checking""" 409 return True
410
411 - def updateInputStatus(self, ltf, partition):
412 """Is called my the last transform (ltf) if the partition 'partition' changes status""" 413 # per default no dependencies exist 414 pass
415
416 - def getJobsForPartitions(self, partitions):
417 """This is only an example, this class should be overridden by derived classes""" 418 return [self.createNewJob(p) for p in partitions]
419 420 ## Information methods
421 - def fqn(self):
422 task = self._getParent() 423 if task: 424 return "Task %i Transform %i" % (task.id, task.transforms.index(self)) 425 else: 426 return "Unassigned Transform '%s'" % (self.name)
427
428 - def n_all(self):
429 return len(self._partition_status)
430
431 - def n_status(self,status):
432 return len([cs for cs in self._partition_status.values() if cs == status])
433
434 - def overview(self):
435 """ Get an ascii art overview over task status. Can be overridden """ 436 task = self._getParent() 437 if not task is None: 438 id = str(task.transforms.index(self)) 439 else: 440 id = "?" 441 o = markup("#%s: %s '%s'\n" % (id, self.__class__.__name__, self.name), status_colours[self.status]) 442 i = 0 443 partitions = self._partition_status.keys() 444 partitions.sort() 445 for c in partitions: 446 s = self._partition_status[c] 447 if c in self.getPartitionApps(): 448 failures = self.getPartitionFailures(c) 449 o += markup("%i:%i " % (c, failures), overview_colours[s]) 450 else: 451 o += markup("%i " % c, overview_colours[s]) 452 i+=1 453 if i % 20 == 0: o+="\n" 454 print o
455
456 - def info(self):
457 print markup("%s '%s'" % (self.__class__.__name__, self.name), status_colours[self.status]) 458 print "* backend: %s" % self.backend.__class__.__name__ 459 print "Application:" 460 self.application.printTree()
461
462 - def getPartitionFailures(self, partition):
463 """Return the number of failures for this partition""" 464 return len([1 for app in self.getPartitionApps()[partition] if app in self._app_status and self._app_status[app] in ["new","failed"]])
465
466 - def updateStatus(self, status):
467 """Update the transform status""" 468 self.status = status
469