Package Ganga :: Package GPIDev :: Package Lib :: Package Registry :: Module JobRegistry'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.GPIDev.Lib.Registry.JobRegistry'

  1  ################################################################################ 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: JobRegistry.py,v 1.1.2.1 2009-07-24 13:39:39 ebke Exp $ 
  5  ################################################################################ 
  6   
  7  # display default values for job list 
  8  from RegistrySlice import config 
  9  config.addOption('jobs_columns', 
 10                   ("fqid","status","name","subjobs","application","backend","backend.actualCE"), 
 11                   'list of job attributes to be printed in separate columns') 
 12   
 13  config.addOption('jobs_columns_width', 
 14                   {'fqid': 8, 'status':10, 'name':10, 'subjobs':8, 'application':15, 'backend':15, 'backend.actualCE':45}, 
 15                   'width of each column') 
 16   
 17  config.addOption('jobs_columns_functions', 
 18                   {'subjobs' : "lambda j: len(j.subjobs)", 'application': "lambda j: j.application._name", 'backend': "lambda j:j.backend._name"}, 
 19                   'optional converter functions') 
 20   
 21  config.addOption('jobs_columns_show_empty', 
 22                   ['fqid'], 
 23                   'with exception of columns mentioned here, hide all values which evaluate to logical false (so 0,"",[],...)') 
 24   
 25  config.addOption('jobs_status_colours', 
 26                              { 'new'        : 'fx.normal', 
 27                                'submitted'  : 'fg.orange', 
 28                                'running'    : 'fg.green', 
 29                                'completed'  : 'fg.blue', 
 30                                'failed'     : 'fg.red' 
 31                                }, 
 32                              'colours for jobs status') 
 33  import Ganga.Utility.logging 
 34  logger = Ganga.Utility.logging.getLogger()                               
 35   
 36  from Ganga.Core.GangaRepository.Registry import Registry, RegistryKeyError, RegistryAccessError 
 37   
38 -class JobRegistry(Registry):
39 - def getProxy(self):
40 slice = JobRegistrySlice(self.name) 41 slice.objects = self 42 return JobRegistrySliceProxy(slice)
43
44 - def getIndexCache(self,obj):
45 cached_values = ['status','id','name'] 46 c = {} 47 for cv in cached_values: 48 if cv in obj._data: 49 c[cv] = obj._data[cv] 50 slice = JobRegistrySlice("tmp") 51 for dpv in slice._display_columns: 52 c["display:"+dpv] = slice._get_display_value(obj, dpv) 53 return c
54
55 - def startup(self):
56 self._needs_metadata = True 57 super(JobRegistry,self).startup() 58 if len(self.metadata.ids()) == 0: 59 from Ganga.GPIDev.Lib.JobTree import JobTree 60 self.metadata._add(JobTree()) 61 self.jobtree = self.metadata[self.metadata.ids()[-1]]
62
63 - def getJobTree(self):
64 return self.jobtree
65 66 from RegistrySlice import RegistrySlice 67 68 69
70 -class JobRegistrySlice(RegistrySlice):
71 - def __init__(self,name):
72 super(JobRegistrySlice,self).__init__(name,display_prefix="jobs") 73 from Ganga.Utility.ColourText import Foreground, Background, Effects 74 fg = Foreground() 75 fx = Effects() 76 bg = Background() 77 try: 78 status_colours = config['jobs_status_colours'] 79 self.status_colours = dict( [ (k, eval(v)) for k,v in status_colours.iteritems() ] ) 80 except Exception,x: 81 logger.warning('configuration problem with colour specification: "%s"', str(x)) 82 status_colours = config.options['jobs_status_colours'].default_value 83 self.status_colours = dict( [ (k, eval(v)) for k,v in status_colours.iteritems() ] ) 84 self.fx = fx 85 self._proxyClass = JobRegistrySliceProxy
86
87 - def _getColour(self,obj):
88 return self.status_colours.get(obj.status,self.fx.normal)
89
90 - def __call__(self,id):
91 """ Retrieve a job by id. 92 """ 93 t = type(id) 94 if t is int: 95 try: 96 return self.objects[id] 97 except KeyError: 98 if self.name == 'templates': 99 raise RegistryKeyError('Template %d not found'%id) 100 else: 101 raise RegistryKeyError('Job %d not found'%id) 102 elif t is tuple: 103 ids = id 104 elif t is str: 105 ids = id.split(".") 106 else: 107 raise RegistryAccessError('Expected a job id: int, (int,int), or "int.int"') 108 109 if not len(ids) in [1,2]: 110 raise RegistryAccessError('Too many ids in the access tuple, 2-tuple (job,subjob) only supported') 111 112 try: 113 ids = [int(id) for id in ids] 114 except TypeError: 115 raise RegistryAccessError('Expeted a job id: int, (int,int), or "int.int"') 116 except ValueError: 117 raise RegistryAccessError('Expected a job id: int, (int,int), or "int.int"') 118 119 try: 120 j = self.objects[ids[0]] 121 except KeyError: 122 if self.name == 'templates': 123 raise RegistryKeyError('Template %d not found'%ids[0]) 124 else: 125 raise RegistryKeyError('Job %d not found'%ids[0]) 126 127 if len(ids)>1: 128 try: 129 return j.subjobs[ids[1]] 130 except IndexError: 131 raise RegistryKeyError('Subjob %s not found' % ('.'.join([str(id) for id in ids]))) 132 else: 133 return j
134
135 - def submit(self,keep_going):
136 self.do_collective_operation(keep_going,'submit')
137
138 - def kill(self,keep_going):
139 self.do_collective_operation(keep_going,'kill')
140
141 - def resubmit(self,keep_going):
142 self.do_collective_operation(keep_going,'resubmit')
143
144 - def fail(self,keep_going,force):
145 raise GangaException('fail() is deprecated, use force_status("failed") instead')
146
147 - def force_status(self,status,keep_going,force):
148 self.do_collective_operation(keep_going,'force_status',status,force=force)
149
150 - def remove(self,keep_going,force):
151 self.do_collective_operation(keep_going,'remove',force=force)
152 153 154 from RegistrySliceProxy import RegistrySliceProxy, _wrap, _unwrap 155
156 -class JobRegistrySliceProxy(RegistrySliceProxy):
157 """This object is an access list of jobs defined in Ganga. 158 159 The 'jobs' represents all existing jobs. 160 161 A subset of jobs may be created by slicing (e.g. jobs[-10:] last ten jobs) 162 or select (e.g. jobs.select(status='new') or jobs.select(10,20) jobs with 163 ids between 10 and 20). A new access list is created as a result of 164 slice/select. The new access list may be further restricted. 165 166 This object allows to perform collective operations listed below such as 167 kill or submit on all jobs in the current range. The keep_going=True 168 (default) means that the operation will continue despite possible errors 169 until all jobs are processed. The keep_going=False means that the 170 operation will bail out with an Exception on a first encountered error. 171 172 173 """
174 - def submit(self,keep_going=True):
175 """ Submit all jobs.""" 176 self._impl.submit(keep_going=keep_going)
177
178 - def resubmit(self,keep_going=True):
179 """ Resubmit all jobs.""" 180 return self._impl.resubmit(keep_going=keep_going)
181
182 - def kill(self,keep_going=True):
183 """ Kill all jobs.""" 184 return self._impl.kill(keep_going=keep_going)
185
186 - def remove(self,keep_going=True,force=False):
187 """ Remove all jobs.""" 188 return self._impl.remove(keep_going=keep_going,force=force)
189
190 - def fail(self,keep_going=True,force=False):
191 """ Fail all jobs.""" 192 return self._impl.fail(keep_going=keep_going,force=force)
193
194 - def force_status(self, status, keep_going=True, force=False):
195 """ Force status of all jobs to 'completed' or 'failed'.""" 196 return self._impl.force_status(status,keep_going=keep_going,force=force)
197
198 - def copy(self,keep_going=True):
199 """ Copy all jobs. """ 200 return JobRegistrySliceProxy(self._impl.copy(keep_going=keep_going))
201 202
203 - def __call__(self,x):
204 """ Access individual job. Examples: 205 jobs(10) : get job with id 10 or raise exception if it does not exist. 206 jobs((10,2)) : get subjobs number 2 of job 10 if exist or raise exception. 207 jobs('10.2')) : same as above 208 """ 209 return _wrap(self._impl.__call__(x))
210
211 - def __getslice__(self, i1,i2):
212 """ Get a slice. Examples: 213 jobs[2:] : get first two jobs, 214 jobs[-10:] : get last 10 jobs. 215 """ 216 return _wrap(self._impl.__getslice__(i1,i2))
217
218 - def __getitem__(self,x):
219 """ Get a job by positional index. Examples: 220 jobs[-1] : get last job, 221 jobs[0] : get first job, 222 jobs[1] : get second job. 223 """ 224 return _wrap(self._impl.__getitem__(_unwrap(x)))
225 226 from Ganga.Utility.external.ordereddict import oDict
227 -def jobSlice(joblist):
228 """create a 'JobSlice' from a list of jobs 229 example: jobSlice([j for j in jobs if j.name.startswith("T1:")])""" 230 slice = JobRegistrySlice("manual slice") 231 slice.objects = oDict([(j.fqid, _unwrap(j)) for j in joblist]) 232 return _wrap(slice)
233 234 from Ganga.Runtime.GPIexport import exportToGPI 235 exportToGPI("jobSlice", jobSlice, "Functions")#, "Create a job slice from a job list") 236