1 from common import *
2 import time
3
4 str_done = markup("done" ,overview_colours["completed"])
5 str_run = markup("run" ,overview_colours["running"])
6 str_fail = markup("fail",overview_colours["failed"])
7 str_hold = markup("hold",overview_colours["hold"])
8 str_bad = markup("bad" ,overview_colours["bad"])
9
10
11 from Ganga.GPIDev.Lib.Registry.RegistrySlice import config
12 config.addOption('tasks_columns',
13 ("id","Type","Name","Status","Jobs",str_done),
14 'list of job attributes to be printed in separate columns')
15
16 config.addOption('tasks_columns_width',
17 {"id":5,"Name":30,'Jobs':6,str_done:6},
18 'width of each column')
19
20 config.addOption('tasks_columns_functions',
21 { 'Name' : "lambda t : t.name",
22 'Type' : "lambda task : task._name",
23 'Status': "lambda task : task.status",
24 'Jobs' : "lambda task : task.n_all()",
25 str_done: "lambda task : task.n_status('completed')",
26 },
27 'optional converter functions')
28
29 config.addOption('tasks_columns_show_empty',
30 ['id','Jobs',str_done],
31 'with exception of columns mentioned here, hide all values which evaluate to logical false (so 0,"",[],...)')
32
33 config.addOption('tasks_show_help',True,'change this to False if you do not want to see the help screen if you first type "tasks" in a session')
34
35 from Ganga.Core.GangaRepository.Registry import Registry, RegistryError, RegistryKeyError, RegistryAccessError
36
37
43
45 cached_values = ['status','id','name']
46 c = {}
47 for cv in cached_values:
48 if cv in obj._data:
49 c[cv] = obj._data[cv]
50 slice = TaskRegistrySlice("tmp")
51 for dpv in slice._display_columns:
52 c["display:"+dpv] = slice._get_display_value(obj, dpv)
53 return c
54
55 - def _thread_main(self):
56 """ This is an internal function; the main loop of the background thread """
57
58 from Ganga.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers
59 from TaskApplication import handler_map
60 for basename, name in handler_map:
61 for backend in allHandlers.getAllBackends(basename):
62 allHandlers.add(name, backend, allHandlers.get(basename,backend))
63
64
65 from Ganga.Core.GangaRepository import getRegistry
66 while not getRegistry("jobs")._started:
67 time.sleep(0.1)
68 if self._main_thread.should_stop():
69 return
70
71 while True:
72 from Ganga.Core import monitoring_component
73 if not monitoring_component is None and monitoring_component.enabled:
74 break
75 time.sleep(0.1)
76 if self._main_thread.should_stop():
77 return
78
79
80
81 for tid in self.ids():
82 try:
83 self[tid]._getWriteAccess()
84 self[tid].startup()
85 except RegistryError:
86 continue
87
88
89 while not self._main_thread.should_stop():
90
91 if monitoring_component.enabled:
92 for tid in self.ids():
93 try:
94 if self[tid].status in ["running","running/pause"]:
95 self[tid]._getWriteAccess()
96 p = self[tid]
97 else:
98 continue
99 except RegistryError:
100
101 continue
102 if self._main_thread.should_stop():
103 break
104 try:
105
106 if (p.n_status("failed")*100.0/(20+p.n_status("completed")) > 20):
107 p.pause()
108 logger.error("Task %s paused - %i jobs have failed while only %i jobs have completed successfully." % (p.name,p.n_status("failed"), p.n_status("completed")))
109 logger.error("Please investigate the cause of the failing jobs and then remove the previously failed jobs using job.remove()")
110 logger.error("You can then continue to run this task with tasks(%i).run()" % p.id)
111 continue
112 numjobs = p.submitJobs()
113 if numjobs > 0:
114 self._flush([p])
115
116
117 p.finaliseTransforms()
118 p.updateStatus()
119
120 except Exception, x:
121 logger.error("Exception occurred in task monitoring loop: %s %s\nThe offending task was paused." % (x.__class__,x))
122 p.pause()
123 if self._main_thread.should_stop():
124 break
125
126 for i in range(0,100):
127 if self._main_thread.should_stop():
128 break
129 time.sleep(0.1)
130
137
138 from Ganga.GPIDev.Lib.Registry.RegistrySlice import RegistrySlice
139
154
157
159 """ Retrieve a job by id.
160 """
161 t = type(id)
162 if t is int:
163 try:
164 return self.objects[id]
165 except KeyError:
166 raise RegistryKeyError('Task id=%d not found'%id)
167 elif t is tuple:
168 ids = id
169 elif t is list:
170 ids = id.split(".")
171 else:
172 raise RegistryAccessError('Expected a job id: int, (int,int), or "int.int"')
173
174 if not len(ids) in [1,2]:
175 raise RegistryAccessError('Too many ids in the access tuple, 2-tuple (job,subjob) only supported')
176
177 try:
178 ids = [int(id) for id in ids]
179 except TypeError:
180 raise RegistryAccessError('Expeted a job id: int, (int,int), or "int.int"')
181 except ValueError:
182 raise RegistryAccessError('Expected a job id: int, (int,int), or "int.int"')
183
184 try:
185 j = self.objects[ids[0]]
186 except KeyError:
187 raise RegistryKeyError('Task %d not found'%ids[0])
188
189 if len(ids)>1:
190 try:
191 return j.transforms[ids[1]]
192 except IndexError:
193 raise RegistryKeyError('Transform %s not found' % ('.'.join([str(id) for id in ids])))
194 else:
195 return j
196
199
200 - def run(self,keep_going):
202
203 - def pause(self,keep_going):
205
206
207 from Ganga.GPIDev.Lib.Registry.RegistrySliceProxy import RegistrySliceProxy, _wrap, _unwrap
208
210 """This object is an access list of tasks.
211
212 The 'tasks' represents all existing tasks.
213
214 A subset of tasks may be created by slicing (e.g. tasks[-10:] last ten tasks)
215 or select (e.g. tasks.select(status='new') or tasks.select(10,20) tasks with
216 ids between 10 and 20). A new access list is created as a result of
217 slice/select. The new access list may be further restricted.
218
219 This object allows to perform collective operations listed below such as
220 run on all tasks in the current range. The keep_going=True
221 (default) means that the operation will continue despite possible errors
222 until all tasks are processed. The keep_going=False means that the
223 operation will bail out with an Exception on a first encountered error.
224 """
225 - def remove(self,keep_going=True):
226 """ Remove all tasks."""
227 return self._impl.remove(keep_going=keep_going)
228
229 - def run(self,keep_going=True):
230 """ Run all tasks."""
231 return self._impl.run(keep_going=keep_going)
232
233 - def pause(self,keep_going=True):
234 """ Pause all tasks."""
235 return self._impl.pause(keep_going=keep_going)
236
237 - def copy(self,keep_going=True):
240
241 - def select(self,minid=None,maxid=None,**attrs):
242 """ Select a subset of tasks. Examples:
243 tasks.select(10): select tasks with ids higher or equal to 10;
244 tasks.select(10,20) select tasks with ids in 10,20 range (inclusive);
245 tasks.select(status='completed') select all tasks with status completed;
246 tasks.select(name='some') select all tasks with some name;
247 """
248 unwrap_attrs = {}
249 for a in attrs:
250 unwrap_attrs[a] = _unwrap(attrs[a])
251 return TaskRegistrySliceProxy(self._impl.select(minid,maxid,**unwrap_attrs))
252
254 """ Access individual job. Examples:
255 tasks(10) : get job with id 10 or raise exception if it does not exist.
256 tasks((10,2)) : get transform number 2 of task 10 if exist or raise exception.
257 tasks('10.2')) : same as above
258 """
259 return _wrap(self._impl.__call__(x))
260
262 """ Get a slice. Examples:
263 tasks[2:] : get first two tasks,
264 tasks[-10:] : get last 10 tasks.
265 """
266 return _wrap(self._impl.__getslice__(i1,i2))
267
269 """ Get a job by positional index. Examples:
270 tasks[-1] : get last job,
271 tasks[0] : get first job,
272 tasks[1] : get second job.
273 """
274 return _wrap(self._impl.__getitem__(_unwrap(x)))
275
276
278 """Prints a more detailed table of tasks and their transforms"""
279 return self.__str__(False)
280
282 """Prints an overview over the currently running tasks"""
283 if config["tasks_show_help"]:
284 self.help(short = True)
285 config.setUserValue("tasks_show_help",False)
286 print "To show this help message again, type 'tasks.help()'."
287 print
288 print " The following is the output of "+markup("tasks.table()",fgcol("blue"))
289 short = False
290
291 fstring = " %5s | %17s | %30s | %9s | %33s | %5s\n"
292 lenfstring = 120
293 ds = "\n" + fstring % ("#", "Type", "Name", "State", "%4s: %4s/ %4s/ %4s/ %4s/ %4s" % (
294 "Jobs",markup("done",overview_colours["completed"])," "+markup("run",overview_colours["running"]),markup("fail",overview_colours["failed"]),markup("hold",overview_colours["hold"])," "+markup("bad",overview_colours["bad"])), "Float")
295 ds += "-"*lenfstring + "\n"
296 for p in self._impl.objects.values():
297 stat = "%4i: %4i/ %4i/ %4i/ %4i/ %4i" % (
298 p.n_all(), p.n_status("completed"),p.n_status("running"),p.n_status("failed"),p.n_status("hold"),p.n_status("bad"))
299 ds += markup(fstring % (p.id, p.__class__.__name__, p.name, p.status, stat, p.float), status_colours[p.status])
300 if short:
301 continue
302 for ti in range(0, len(p.transforms)):
303 t = p.transforms[ti]
304 stat = "%4i: %4i/ %4i/ %4i/ %4i/ %4s" % (
305 t.n_all(), t.n_status("completed"),t.n_status("running"),t.n_status("failed"),t.n_status("hold"),t.n_status("bad"))
306 ds += markup(fstring % ("%i.%i"%(p.id, ti), t.__class__.__name__, t.name, t.status, stat, ""), status_colours[t.status])
307 ds += "-"*lenfstring + "\n"
308 return ds + "\n"
309
310 _display = __str__
311
312 - def help(self, short=False):
313 """Print a short introduction and 'cheat sheet' for the Ganga Tasks package"""
314 print
315 print markup(" *** Ganga Tasks: Short Introduction and 'Cheat Sheet' ***", fgcol("blue"))
316 print
317 print markup("Definitions: ", fgcol("red")) + "'Partition' - A unit of processing, for example processing a file or processing some events from a file"
318 print " 'Transform' - A group of partitions that have a common Ganga Application and Backend."
319 print " 'Task' - A group of one or more 'Transforms' that can have dependencies on each other"
320 print
321 print markup("Possible status values for partitions:", fgcol("red"))
322 print ' * "' + markup("ready", overview_colours["ready"]) + '" - ready to be executed '
323 print ' * "' + markup("hold", overview_colours["hold"]) + '" - dependencies not completed'
324 print ' * "' + markup("running", overview_colours["running"]) + '" - at least one job tries to process this partition'
325 print ' * "' + markup("attempted", overview_colours["attempted"]) + '"- tasks tried to process this partition, but has not yet succeeded'
326 print ' * "' + markup("failed", overview_colours["failed"]) + '" - tasks failed to process this partition several times'
327 print ' * "' + markup("bad", overview_colours["bad"]) + '" - this partition is excluded from further processing and will not be used as input to subsequent transforms'
328 print ' * "' + markup("completed", overview_colours["completed"]) + '" '
329 print
330 def c(s):
331 return markup(s,fgcol("blue"))
332 print markup("Important commands:", fgcol("red"))
333 print " Get a quick overview : "+c("tasks")+" Get a detailed view : "+c("tasks.table()")
334 print " Access an existing task : "+c("t = tasks(id)")+" Remove a Task : "+c("tasks(id).remove()")
335 print " Create a new (MC) Task : "+c("t = MCTask()")+" Copy a Task : "+c("nt = t.copy()")
336 print " Show task configuration : "+c("t.info()")+" Show processing status : "+c("t.overview()")
337 print " Set the float of a Task : "+c("t.float = 100")+" Set the name of a task : "+c("t.name = 'My Own Task v1'")
338 print " Start processing : "+c("t.run()")+" Pause processing : "+c("t.pause()")
339 print " Access Transform id N : "+c("tf = t.transforms[N]")+" Pause processing of tf : "+c("tf.pause()")+" # This command is reverted by using t.run()"
340 print " Transform Application : "+c("tf.application")+" Transform Backend : "+c("tf.backend")
341 print
342 print " Set parameter in all applications : "+c("t.setParameter(my_software_version='1.42.0')")
343 print " Set backend for all transforms : "+c("t.setBackend(backend) , p.e. t.setBackend(LCG())")
344 print " Limit on how often jobs are resubmitted : "+c("tf.run_limit = 4")
345 print " Manually change the status of partitions: "+c("tf.setPartitionStatus(partition, 'status')")
346 print
347 print " For an ATLAS Monte Carlo Production Example and specific help type: "+c("MCTask?")
348 print " For an ATLAS Analysis Example and help type: "+c("AnaTask?")
349 print
350
351 if not True:
352
353 print "ADVANCED COMMANDS:"
354 print "Add Transform at position N : t.insertTransform(N, transform)"
355 print "Remove Transform at position N : t.removeTransform(N)"
356 print "Set Transform Application : tf.application = TaskApp() #This Application must be a 'Task Version' of the usual application"
357 print " Adding Task Versions of Applications is easy, contact the developers to request an inclusion"
358