Package Ganga :: Package GPIDev :: Package Lib :: Package Tasks :: Module Task'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.GPIDev.Lib.Tasks.Task'

  1  from common import * 
  2  from Ganga.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy 
  3  import time 
  4   
  5  ######################################################################## 
  6   
7 -class Task(GangaObject):
8 """This is a Task without special properties""" 9 _schema = Schema(Version(1,0), { 10 'transforms' : ComponentItem('transforms',defvalue=[],sequence=1,copyable=1,doc='list of transforms'), 11 'id' : SimpleItem(defvalue=-1, protected=1, doc='ID of the Task', typelist=["int"]), 12 'name' : SimpleItem(defvalue='NewTask', copyable=1, doc='Name of the Task', typelist=["str"]), 13 'status' : SimpleItem(defvalue='new', protected=1, doc='Status - new, running, pause or completed', typelist=["str"]), 14 'float' : SimpleItem(defvalue=0, copyable=1, doc='Number of Jobs run concurrently', typelist=["int"]), 15 'resub_limit' : SimpleItem(defvalue=0.9, copyable=1, doc='Resubmit only if the number of running jobs is less than "resub_limit" times the float. This makes the job table clearer, since more jobs can be submitted as subjobs.', typelist=["float"]), 16 'creation_date': SimpleItem(defvalue="19700101",copyable=0,hidden=1,doc='Creation date of the task (used in dq2 datasets)', typelist=["str"]), 17 }) 18 19 _category = 'tasks' 20 _name = 'Task' 21 _exportmethods = [ 22 'setBackend', 'setParameter', 'insertTransform', 'appendTransform', 'removeTransform', # Settings 23 'check', 'run', 'pause', 'remove', # Operations 24 'overview', 'info', 'n_all', 'n_status', 'help', 'getJobs' # Info 25 ] 26 27 default_registry = "tasks" 28 29 ## Special methods:
30 - def _auto__init__(self,registry=None):
31 if registry is None: 32 from Ganga.Core.GangaRepository import getRegistry 33 registry = getRegistry(self.default_registry) 34 # register the job (it will also commit it) 35 # job gets its id now 36 registry._add(self) 37 self.creation_date = time.strftime('%Y%m%d%H%M%S') 38 self.initialize() 39 self.startup() 40 self._setDirty()
41
42 - def initialize(self):
43 pass
44
45 - def startup(self):
46 """Startup function on Ganga startup""" 47 for t in self.transforms: 48 t.startup()
49 50 # def _readonly(self): 51 # """A task is read-only if the status is not new.""" 52 # if self.status == "new": 53 # return 0 54 # return 1 55 56 ## Public methods: 57 # 58 # - remove() a task 59 # - clone() a task 60 # - check() a task (if updated) 61 # - run() a task to start processing 62 # - pause() to interrupt processing 63 # - setBackend(be) for all transforms 64 # - setParameter(myParam=True) for all transforms 65 # - insertTransform(id, tf) insert a new processing step 66 # - removeTransform(id) remove a processing step 67
68 - def remove(self,remove_jobs="do_nothing"):
69 """Delete the task""" 70 if not remove_jobs in [True,False]: 71 print "You want to remove the task %i named '%s'." % (self.id,self.name) 72 print "Since this operation cannot be easily undone, please call this command again:" 73 print " * as tasks(%i).remove(remove_jobs=True) if you want to remove all associated jobs," % (self.id) 74 print " * as tasks(%i).remove(remove_jobs=False) if you want to keep the jobs." % (self.id) 75 return 76 if remove_jobs: 77 for j in GPI.jobs: 78 try: 79 stid = j.application.tasks_id.split(":") 80 if int(stid[-2]) == self.id: 81 j.remove() 82 except Exception, x: 83 pass 84 self._getRegistry()._remove(self) 85 logger.info("Task #%s deleted" % self.id)
86
87 - def clone(self):
88 c = super(Task,self).clone() 89 for tf in c.transforms: 90 tf.status = "new" 91 tf._partition_apps = {} # This is cleared separately since it is not in the schema 92 #self._getParent().register(c) 93 c.check() 94 return c
95
96 - def check(self):
97 """This function is called by run() or manually by the user""" 98 if self.status != "new": 99 logger.error("The check() function may modify a task and can therefore only be called on new tasks!") 100 return 101 try: 102 for t in self.transforms: 103 t.check() 104 finally: 105 self.updateStatus() 106 return True
107
108 - def run(self):
109 """Confirms that this task is fully configured and ready to be run.""" 110 if self.status == "new": 111 self.check() 112 113 if self.status != "completed": 114 if self.float == 0: 115 logger.warning("The 'float', the number of jobs this task may run, is still zero. Type 'tasks(%i).float = 5' to allow this task to submit 5 jobs at a time" % self.id) 116 try: 117 for tf in self.transforms: 118 if tf.status != "completed": 119 tf.run(check=False) 120 121 finally: 122 self.updateStatus() 123 else: 124 logger.info("Task is already completed!")
125 126
127 - def pause(self):
128 """Pause the task - the background thread will not submit new jobs from this task""" 129 float_cache = self.float 130 self.float = 0 131 if self.status != "completed": 132 for tf in self.transforms: 133 tf.pause() 134 self.status = "pause" 135 else: 136 logger.info("Transform is already completed!") 137 self.float = float_cache
138
139 - def setBackend(self,backend):
140 """Sets the backend on all transforms""" 141 for tf in self.transforms: 142 if backend is None: 143 tf.backend = None 144 else: 145 tf.backend = stripProxy(backend).clone()
146
147 - def setParameter(self,**args):
148 """Use: setParameter(processName="HWW") to set the processName in all applications to "HWW" 149 Warns if applications are not affected because they lack the parameter""" 150 for name, parm in args.iteritems(): 151 for tf in [t for t in self.transforms if t.application]: 152 if name in tf.application._data: 153 addProxy(tf.application).__setattr__(name, parm) 154 else: 155 logger.warning("Transform %i was not affected!", tf.name)
156
157 - def insertTransform(self, id, tf):
158 """Insert transfrm tf before index id (counting from 0)""" 159 if self.status != "new" and id < len(self.transforms): 160 logger.error("You can only insert transforms at the end of the list. Only if a task is new it can be freely modified!") 161 return 162 #self.transforms.insert(id,tf.copy()) # this would be safer, but breaks user exspectations 163 self.transforms.insert(id,tf) # this means that t.insertTransform(0,t2.transforms[0]) will cause Great Breakage
164
165 - def appendTransform(self, tf):
166 """Append transform""" 167 return self.insertTransform(len(self.transforms), tf)
168
169 - def removeTransform(self, id):
170 """Remove the transform with the index id (counting from 0)""" 171 if self.status != "new": 172 logger.error("You can only remove transforms if the task is new!") 173 return 174 del self.transforms[id]
175
176 - def getJobs(self, transform=None, partition=None, only_master_jobs=True):
177 """ Get the job slice of all jobs that process this task """ 178 if not partition is None: 179 only_master_jobs = False 180 jobslice = JobRegistrySlice("tasks(%i).getJobs(transform=%s, partition=%s, only_master_jobs=%s)"%(self.id, transform, partition, only_master_jobs)) 181 def addjob(j): 182 if transform is None or partition is None or self.transforms[int(transform)]._app_partition[j.application.id] == partition: 183 jobslice.objects[j.fqid] = stripProxy(j)
184 185 for j in GPI.jobs: 186 try: 187 stid = j.application.tasks_id.split(":") 188 if int(stid[-2]) == self.id and (transform is None or stid[-1] == str(transform)): 189 if j.subjobs and not only_master_jobs: 190 for sj in j.subjobs: 191 addjob(sj) 192 else: 193 addjob(j) 194 except Exception, x: 195 #print x 196 pass 197 return JobRegistrySliceProxy(jobslice)
198 199 ## Internal methods
200 - def finaliseTransforms(self):
201 """Check for any things needing doing after a transform has completed""" 202 for t in self.transforms: 203 t.finalise()
204
205 - def updateStatus(self):
206 """Updates status based on transform status. 207 Called from check() or if status of a transform changes""" 208 # Calculate status from transform status: 209 states = [tf.status for tf in self.transforms] 210 if "running" in states and "pause" in states: 211 new_status = "running/pause" 212 elif "running" in states: 213 new_status = "running" 214 elif "pause" in states: 215 new_status = "pause" 216 elif "new" in states: 217 new_status = "new" 218 elif "completed" in states: 219 new_status = "completed" 220 else: 221 new_status = "new" # no tranforms 222 # Handle status changes here: 223 if self.status != new_status: 224 if new_status == "running/pause": 225 logger.info("Some Transforms of Task %i '%s' have been paused. Check tasks.table() for details!" % (self.id, self.name)) 226 elif new_status == "completed": 227 logger.warning("Task %i '%s' has completed!" % (self.id, self.name)) 228 elif self.status == "completed": 229 logger.warning("Task %i '%s' has been reopened!" % (self.id, self.name)) 230 self.status = new_status 231 return self.status
232
233 - def submitJobs(self):
234 """Submits as many jobs as necessary to maintain the float. Internal""" 235 numjobs = 0 236 for i in range(len(self.transforms)-1,-1,-1): 237 if not self.status == "running": 238 break 239 tf = self.transforms[i] 240 to_run = self.float - self.n_status("running") 241 run = (self.resub_limit * self.float >= self.n_status("running")) 242 if tf.status == "running" and to_run > 0 and run: 243 numjobs += tf.submitJobs(to_run) 244 return numjobs
245 246 ## Information methods
247 - def n_all(self):
248 return sum([t.n_all() for t in self.transforms])
249
250 - def n_status(self,status):
251 return sum([t.n_status(status) for t in self.transforms])
252
253 - def overview(self):
254 """ Get an ascii art overview over task status. Can be overridden """ 255 print "Colours: " + ", ".join([markup(key, overview_colours[key]) 256 for key in ["hold", "ready", "running", "completed", "attempted", "failed", "bad", "unknown"]]) 257 print "Lists the partitions of events that are processed in one job, and the number of failures to process it." 258 print "Format: (partition number)[:(number of failed attempts)]" 259 print 260 for t in self.transforms: 261 t.overview()
262
263 - def info(self):
264 for t in self.transforms: 265 t.info()
266
267 - def help(self):
268 print "This is a Task without special properties"
269