Package Ganga :: Package GPIDev :: Package Lib :: Package Job :: Module JobTime'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.GPIDev.Lib.Job.JobTime'

  1  import os 
  2  import datetime 
  3  import time 
  4   
  5  from Ganga.GPIDev.Base import GangaObject 
  6  from Ganga.GPIDev.Schema import * 
  7   
  8  import Ganga.Utility.Config 
  9  Ganga.Utility.Config.config_scope['datetime'] = datetime 
 10           
11 -class JobTime(GangaObject):
12 """Job timestamp access. 13 In development 14 15 Changes in the status of a Job are timestamped - a datetime object 16 is stored in the dictionary named 'timestamps', in Coordinated 17 Universal Time(UTC). More information on datetime objects can be 18 found at: 19 20 http://docs.python.org/library/datetime.html 21 22 Datetime objects can be subtracted to produce a 'timedelta' object. 23 More information about these can be found at the above address. 24 '+', '*', and '/' are not supported by datetime objects. 25 26 Datetime objects can be formatted into strings using the 27 .strftime(format_string) application, and the strftime codes. 28 e.g. %Y -> year as integer 29 %a -> abbreviated weekday name 30 %M -> minutes as inetger 31 32 The full list can be found at: 33 http://docs.python.org/library/datetime.html#strftime-behavior 34 35 Standard status types with built in access methods are: 36 -'new' 37 -'submitted' 38 -'running' 39 -'completed' 40 -'killed' 41 -'failed' 42 43 These return a string with default format %Y/%m/%d @ %H:%M:%S. A 44 custom format can be specified in the arguement. 45 46 Any information stored within the timestamps dictionary can also be 47 extracted in the way as in would be for a standard, non-application 48 specific python dictionary. 49 50 For a table display of the Job's timestamps use .time.display(). For 51 timestamps details from the backend use .time.details() 52 53 54 """ 55 56 timestamps = {} 57 sj_statlist = [] 58 59 _schema = Schema(Version(0,0),{'timestamps' : SimpleItem(defvalue={},doc="Dictionary containing timestamps for job", summary_print='_timestamps_summary_print') 60 }) 61 62 _category = 'jobtime' 63 _name = 'JobTime' 64 _exportmethods = ['display', 65 'new', 66 'submitting', 67 'submitted', 68 'backend_running', 69 'backend_final', 70 'completing', 71 'final', 72 'runtime', 73 'waittime', 74 'submissiontime', 75 'details', 76 'printdetails'] 77
78 - def __init__(self):
79 super(JobTime, self).__init__() 80 self.timestamps = {} 81 self.sj_statlist = [] #this makes sure the contents of the list don't get copied when the Job does.
82
83 - def __deepcopy__(self,memo):
84 obj = super(JobTime, self).__deepcopy__(memo) 85 obj.newjob() 86 return obj
87
88 - def newjob(self):
89 """Timestamps job upon creation. 90 """ 91 t = datetime.datetime.utcnow() 92 self.timestamps['new'] = t
93
94 - def timenow(self, status):
95 """Updates timestamps as job status changes. 96 """ 97 j = self.getJobObject() 98 t_now = datetime.datetime.utcnow() 99 b_list = ['running', 'completing', 'completed', 'failed'] 100 final = ['killed', 'failed', 'completed'] 101 backend_final = ['failed', 'completed'] 102 ganga_master = ['new', 'submitting', 'killed'] 103 104 logger.debug("Job %d called timenow('%s')", j.id, status) 105 106 #standard method: 107 if not j.subjobs: 108 #backend stamps 109 if status in b_list: 110 for childstatus in b_list: 111 be_statetime = j.backend.getStateTime(childstatus) 112 if be_statetime != None: 113 if childstatus in backend_final: 114 self.timestamps["backend_final"] = be_statetime 115 logger.debug("Wrote 'backend_final' to timestamps.") 116 else: 117 self.timestamps["backend_"+childstatus] = be_statetime 118 logger.debug("Wrote 'backend_%s' to timestamps.", childstatus) 119 if childstatus==status: break 120 #ganga stamps 121 if status in final: 122 self.timestamps["final"] = t_now 123 logger.debug("Wrote 'final' to timestamps.") 124 else: 125 self.timestamps[status] = t_now 126 logger.debug("Wrote '%s' to timestamps.", status) 127 128 #subjobs method: 129 if j.master: #identifies subjobs 130 logger.debug("j.time.timenow() caught subjob %d.%d in the '%s' status", j.master.id, j.id, status) 131 132 for written_status in j.time.timestamps.keys(): 133 if written_status not in j.master.time.sj_statlist: 134 j.master.time.sj_statlist.append(written_status) 135 logger.debug("written_status: '%s' written to sj_statlist", written_status) 136 137 #master job method 138 if j.subjobs: #identifies master job 139 logger.debug("j.time.timenow() caught master job %d in the '%s' status", j.id, status) 140 if status in ganga_master: ## don't use subjob stamp for these 141 self.timestamps[status] = t_now 142 logger.debug("status: '%s' in ganga_master written to master timestamps.", status) 143 else: 144 for state in self.sj_statlist: 145 if state not in ganga_master: 146 j.time.timestamps[state] = self.sjStatList_return(state) 147 logger.debug("state: '%s' from sj_statlist to written to master timestamps.", state) 148 else: 149 pass
150
151 - def sjStatList_return(self, status):
152 list = [] 153 final = ['backend_final', 'final'] 154 j = self.getJobObject() 155 for sjs in j.subjobs: 156 try: 157 if type(sjs.time.timestamps[status]) == datetime.datetime: 158 list.append(sjs.time.timestamps[status]) 159 else: 160 logger.debug('Attempt to add a non datetime object in the timestamp, job=%d, subjob=%d',j.id,sjs.id) 161 except KeyError: 162 logger.debug("Status '%s' not found in timestamps of job %d.%d.", status, sjs.master.id, sjs.id) 163 list.sort() 164 try: 165 if status in final: 166 return list[-1] 167 return list[0] 168 except IndexError: 169 logger.debug("IndexError: ID: %d, Status: '%s', length of list: %d", j.id, status, len(list)) #change this to a more appropriate debug. 170 pass
171
172 - def display(self, format="%Y/%m/%d %H:%M:%S"): ## Justin 10.9.09: I think 'ljust' might be just as good if not better than 'rjust' here:
173 """Displays existing timestamps in a table. 174 175 Format can be specified by typing a string of the appropriate strftime() behaviour codes as the arguement. 176 e.g. '%H:%M:%S' ==> 13:55:01 177 178 For a full list of codes see 179 http://docs.python.org/library/datetime.html?#strftime-behavior 180 """ 181 retstr='' 182 T = datetime.datetime.now() 183 tstring = T.strftime(format) 184 length = len(tstring) 185 times = [0 for k in self.timestamps.keys()] 186 for i in range(0, len(self.timestamps.keys())): 187 try: 188 times[i] = self.timestamps[self.timestamps.keys()[i]].strftime(format).rjust(length) + ' - ' + self.timestamps.keys()[i] 189 except AttributeError: 190 times[i] = str(self.timestamps[self.timestamps.keys()[i]]).rjust(length) + ' - ' + self.timestamps.keys()[i] 191 192 times.sort() ##try to make chronological - can fail when timestamps are the same to nearest sec -> becomes alphabetical... 193 retstr = retstr + '\n' + 'Time (UTC)'.rjust(length) + ' Status' + '\n' 194 for i in range(0,21): 195 retstr = retstr + '- ' 196 retstr = retstr+'\n' 197 for i in range (0, len(times)): 198 retstr = retstr + times[i] + '\n' 199 return retstr
200
201 - def _timestamps_summary_print(self,value,verbosity_level):
202 """Used to display timestamps when JobTime object is displayed. 203 """ 204 return self.display()
205 206 #This didn't work: 207 # 208 #def __str__(self): 209 # """ string cast """ 210 # return self.display() 211
212 - def details(self, subjob=None):
213 """Obtains all timestamps available from the job's specific backend. 214 215 Subjob arguement: None = default 216 'all' = gets details for ALL SUBJOBS. You have been warned. 217 int = gets details for subjob number 'int' 218 219 No argument is required for a job with no subjobs. 220 """ 221 j = self.getJobObject() 222 idstr = '' 223 detdict = {} 224 225 #If job is SUBJOB do the normal procedure. Not sure this clause is neccessary as subjobs will be caught normally 226 if j.master: 227 logger.debug("j.time.details(): subjob %d.%d caught.", j.master.id, j.id) 228 detdict = j.backend.timedetails() 229 return detdict 230 231 #If job is MASTER iterate over subjobs and do normal method. This isn't going to be ideal for a large number of subjobs 232 if j.subjobs: 233 logger.debug("j.time.details(): master job %d caught.", j.id) 234 idstr = str(j.id) 235 236 # User wants 'all' 237 if subjob == 'all': 238 keyin = None 239 240 # NOTE: The interactive loop below was more an exercise for learning how 'keyin' is used than a useful addition. 241 # ask whether user really wants to print timedetails for all their jobs: 242 while keyin == None: 243 keyin = raw_input("Are you sure you want details for ALL %d subjobs(y/n)?" %len(j.subjobs)) 244 #if yes carry on at for loop 245 if keyin == 'y': 246 pass 247 #if no return None. Doesn't execute rest of method 248 elif keyin == 'n': 249 return None 250 #if something else - asks again 251 else: 252 print "y/n please!" 253 keyin = None 254 255 for jobs in j.subjobs: 256 subidstr = idstr + '.' + str(jobs.id) 257 logger.debug("Subjob: %d, Backend ID: %d", jobs.id, jobs.backend.id) #String needs more info if it is going to stay in. 258 detdict[subidstr] = jobs.backend.timedetails() 259 return detdict 260 261 # no arguement specified 262 elif subjob == None: 263 logger.debug("j.time.details(): no subjobs specified for this master job.") 264 return None 265 266 # Subjob id or string passed 267 else: 268 # string = error 269 if type(subjob) != int: 270 raise TypeError("Subjob id requires type 'int'") 271 # subjob id supplied 272 for sj in j.subjobs: 273 if sj.id == subjob: 274 logger.debug("Subjob: %d, Backend ID: %d", sj.id, sj.backend.id) 275 detdict = sj.backend.timedetails() 276 return detdict 277 else: 278 pass 279 if subjob >= len(j.subjobs): 280 logger.warning("Index '%s' is out of range. Corresponding subjob does not exist.", str(subjob)) 281 return None 282 283 logger.debug("subjob arguement '%s' has failed to be caught and dealt with.", subjob) 284 return None 285 286 detdict = j.backend.timedetails() ## called if no subjobs 287 return detdict
288
289 - def printdetails(self, subjob=None):
290 """Prints backend details to screen by calling details() and printing the returned dictionary. 291 """ 292 j = self.getJobObject() 293 if subjob == 'all': 294 #the warning and action taken below are pretty annoying, but I was unsure how to deal with the request to print the details for all n subjobs, which seems unlikely to be made. 295 logger.warning("It might be unwise to print all subjobs details. Use details() and extract relevant info from dictionary.") 296 return None 297 pd = self.details(subjob) 298 for key in pd.keys(): 299 print key, '\t', pd[key]
300
301 - def runtime(self):
302 """Method which returns the 'runtime' of the specified job. 303 304 The runtime is calculated as the duration between the job entering the 'running' state and the job entering the 'completed' state. 305 """ 306 end_list = ['killed', 'completed', 'failed'] 307 end_stamps = {} 308 309 # if master job, sum: 310 j = self.getJobObject() 311 if j.subjobs: 312 masterrun = datetime.timedelta(0, 0, 0) 313 for jobs in j.subjobs: 314 masterrun = masterrun + jobs.time.runtime() 315 return masterrun 316 #all other jobs: 317 return self.duration('backend_running', 'backend_final')
318
319 - def waittime(self):
320 """Method which returns the waiting time of the specified job. 321 322 The waiting time is calculated as the duration between the job entering the 'submitted' state and entering the 'running' state. 323 """ 324 #master job: 325 j = self.getJobObject() 326 if j.subjobs: 327 start_list = [] 328 end_list = [] 329 for jobs in j.subjobs: 330 start_list.append(jobs.time.timestamps['submitted']) 331 end_list.append(jobs.time.timestamps['backend_running']) 332 start_list.sort() 333 end_list.sort() 334 start = start_list[0] 335 end = end_list[len(end_list) - 1] 336 masterwait = end - start 337 return masterwait 338 #all other jobs: 339 return self.duration('submitted', 'backend_running')
340
341 - def submissiontime(self):
342 """Method which returns submission time of specified job. 343 344 Calculation: sub_time = submitted - submitting. 345 """ 346 j = self.getJobObject() 347 if j.subjobs: 348 start_list = [] 349 end_list = [] 350 for jobs in j.subjobs: 351 end_list.append(jobs.time.timestamps['submitted']) 352 end_list.sort() 353 start = j.time.timestamps['submitting'] 354 end = end_list[len(end_list) - 1] 355 mastersub = end - start 356 return mastersub 357 return self.duration('submitting', 'submitted')
358
359 - def duration(self, start, end):
360 """Returns duration between two specified timestamps as timedelta object. 361 """ 362 if start in self.timestamps.keys(): 363 if end in self.timestamps.keys(): 364 s,e = self.timestamps[start],self.timestamps[end] 365 s_micro, e_micro = datetime.timedelta(0, 0, s.microsecond), datetime.timedelta(0, 0, e.microsecond) 366 e,s = e-e_micro, s-s_micro 367 td = e-s 368 369 #method for rounding removed because timestamps aren't always recorded with microsecond precision, and stamping accuracy isn't high enough to justify doing so 370 # ds = td.days 371 # secs = td.seconds 372 # micros = td.microseconds 373 # if micros >= 500000: 374 # secs +=1 375 376 dur = td #datetime.timedelta(days=ds, seconds=secs) 377 return dur 378 else: 379 logger.warning("Could not calculate duration: '%s' not found.", end) 380 else: 381 logger.warning("Could not calculate duration: '%s' not found.", start) 382 return None
383 384 385
386 - def statetime(self, status, format=None):
387 """General method for obtaining the specified timestamp in specified format. 388 """ 389 if status not in self.timestamps: 390 logger.debug("Timestamp '%s' not available.", status) 391 return None 392 if format != None: 393 return self.timestamps[status].strftime(format) 394 return self.timestamps[status]
395
396 - def new(self, format=None):
397 """Method for obtaining 'new' timestamp. 398 """ 399 return self.statetime('new', format)
400
401 - def submitting(self, format=None):
402 """Method for obtaining 'submitting' timestamp. 403 """ 404 return self.statetime('submitting', format)
405
406 - def submitted(self, format=None):
407 """Method for obtaining 'submitted' timestamp. 408 """ 409 return self.statetime('submitted', format)
410
411 - def backend_running(self, format=None):
412 """Method for obtaining 'backend_running' timestamp. 413 """ 414 return self.statetime('backend_running', format)
415
416 - def backend_final(self, format=None):
417 """Method for obtaining 'backend_final' timestamp. 418 """ 419 return self.statetime('backend_final', format)
420
421 - def completing(self, format=None):
422 """Method for obtaining 'completing' timestamp. 423 """ 424 return self.statetime('completing', format)
425
426 - def final(self, format=None):
427 """Method for obtaining 'final' timestamp. 428 """ 429 return self.statetime('final', format)
430