1 from common import *
2 from Ganga.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy
3 import time
4
5
6
7 -class Task(GangaObject):
8 """This is a Task without special properties"""
9 _schema = Schema(Version(1,0), {
10 'transforms' : ComponentItem('transforms',defvalue=[],sequence=1,copyable=1,doc='list of transforms'),
11 'id' : SimpleItem(defvalue=-1, protected=1, doc='ID of the Task', typelist=["int"]),
12 'name' : SimpleItem(defvalue='NewTask', copyable=1, doc='Name of the Task', typelist=["str"]),
13 'status' : SimpleItem(defvalue='new', protected=1, doc='Status - new, running, pause or completed', typelist=["str"]),
14 'float' : SimpleItem(defvalue=0, copyable=1, doc='Number of Jobs run concurrently', typelist=["int"]),
15 'resub_limit' : SimpleItem(defvalue=0.9, copyable=1, doc='Resubmit only if the number of running jobs is less than "resub_limit" times the float. This makes the job table clearer, since more jobs can be submitted as subjobs.', typelist=["float"]),
16 'creation_date': SimpleItem(defvalue="19700101",copyable=0,hidden=1,doc='Creation date of the task (used in dq2 datasets)', typelist=["str"]),
17 })
18
19 _category = 'tasks'
20 _name = 'Task'
21 _exportmethods = [
22 'setBackend', 'setParameter', 'insertTransform', 'appendTransform', 'removeTransform',
23 'check', 'run', 'pause', 'remove',
24 'overview', 'info', 'n_all', 'n_status', 'help', 'getJobs'
25 ]
26
27 default_registry = "tasks"
28
29
41
44
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 - def remove(self,remove_jobs="do_nothing"):
69 """Delete the task"""
70 if not remove_jobs in [True,False]:
71 print "You want to remove the task %i named '%s'." % (self.id,self.name)
72 print "Since this operation cannot be easily undone, please call this command again:"
73 print " * as tasks(%i).remove(remove_jobs=True) if you want to remove all associated jobs," % (self.id)
74 print " * as tasks(%i).remove(remove_jobs=False) if you want to keep the jobs." % (self.id)
75 return
76 if remove_jobs:
77 for j in GPI.jobs:
78 try:
79 stid = j.application.tasks_id.split(":")
80 if int(stid[-2]) == self.id:
81 j.remove()
82 except Exception, x:
83 pass
84 self._getRegistry()._remove(self)
85 logger.info("Task #%s deleted" % self.id)
86
95
97 """This function is called by run() or manually by the user"""
98 if self.status != "new":
99 logger.error("The check() function may modify a task and can therefore only be called on new tasks!")
100 return
101 try:
102 for t in self.transforms:
103 t.check()
104 finally:
105 self.updateStatus()
106 return True
107
109 """Confirms that this task is fully configured and ready to be run."""
110 if self.status == "new":
111 self.check()
112
113 if self.status != "completed":
114 if self.float == 0:
115 logger.warning("The 'float', the number of jobs this task may run, is still zero. Type 'tasks(%i).float = 5' to allow this task to submit 5 jobs at a time" % self.id)
116 try:
117 for tf in self.transforms:
118 if tf.status != "completed":
119 tf.run(check=False)
120
121 finally:
122 self.updateStatus()
123 else:
124 logger.info("Task is already completed!")
125
126
128 """Pause the task - the background thread will not submit new jobs from this task"""
129 float_cache = self.float
130 self.float = 0
131 if self.status != "completed":
132 for tf in self.transforms:
133 tf.pause()
134 self.status = "pause"
135 else:
136 logger.info("Transform is already completed!")
137 self.float = float_cache
138
146
156
164
168
175
176 - def getJobs(self, transform=None, partition=None, only_master_jobs=True):
177 """ Get the job slice of all jobs that process this task """
178 if not partition is None:
179 only_master_jobs = False
180 jobslice = JobRegistrySlice("tasks(%i).getJobs(transform=%s, partition=%s, only_master_jobs=%s)"%(self.id, transform, partition, only_master_jobs))
181 def addjob(j):
182 if transform is None or partition is None or self.transforms[int(transform)]._app_partition[j.application.id] == partition:
183 jobslice.objects[j.fqid] = stripProxy(j)
184
185 for j in GPI.jobs:
186 try:
187 stid = j.application.tasks_id.split(":")
188 if int(stid[-2]) == self.id and (transform is None or stid[-1] == str(transform)):
189 if j.subjobs and not only_master_jobs:
190 for sj in j.subjobs:
191 addjob(sj)
192 else:
193 addjob(j)
194 except Exception, x:
195
196 pass
197 return JobRegistrySliceProxy(jobslice)
198
199
204
206 """Updates status based on transform status.
207 Called from check() or if status of a transform changes"""
208
209 states = [tf.status for tf in self.transforms]
210 if "running" in states and "pause" in states:
211 new_status = "running/pause"
212 elif "running" in states:
213 new_status = "running"
214 elif "pause" in states:
215 new_status = "pause"
216 elif "new" in states:
217 new_status = "new"
218 elif "completed" in states:
219 new_status = "completed"
220 else:
221 new_status = "new"
222
223 if self.status != new_status:
224 if new_status == "running/pause":
225 logger.info("Some Transforms of Task %i '%s' have been paused. Check tasks.table() for details!" % (self.id, self.name))
226 elif new_status == "completed":
227 logger.warning("Task %i '%s' has completed!" % (self.id, self.name))
228 elif self.status == "completed":
229 logger.warning("Task %i '%s' has been reopened!" % (self.id, self.name))
230 self.status = new_status
231 return self.status
232
234 """Submits as many jobs as necessary to maintain the float. Internal"""
235 numjobs = 0
236 for i in range(len(self.transforms)-1,-1,-1):
237 if not self.status == "running":
238 break
239 tf = self.transforms[i]
240 to_run = self.float - self.n_status("running")
241 run = (self.resub_limit * self.float >= self.n_status("running"))
242 if tf.status == "running" and to_run > 0 and run:
243 numjobs += tf.submitJobs(to_run)
244 return numjobs
245
246
249
252
254 """ Get an ascii art overview over task status. Can be overridden """
255 print "Colours: " + ", ".join([markup(key, overview_colours[key])
256 for key in ["hold", "ready", "running", "completed", "attempted", "failed", "bad", "unknown"]])
257 print "Lists the partitions of events that are processed in one job, and the number of failures to process it."
258 print "Format: (partition number)[:(number of failed attempts)]"
259 print
260 for t in self.transforms:
261 t.overview()
262
266
268 print "This is a Task without special properties"
269