Package Ganga :: Package Core :: Package JobRepositoryXML :: Module Repository'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.JobRepositoryXML.Repository'

  1  import Ganga.Utility.logging 
  2  logger = Ganga.Utility.logging.getLogger(modulename=1) 
  3   
  4  version = '5.0' 
  5   
  6  import os,shutil 
  7   
8 -def makedir(d):
9 os.mkdir(d)
10
11 -def removedir(d):
12 shutil.rmtree(d)
13
14 -def makedirs(d):
15 try: 16 os.makedirs(d) 17 except OSError,x: 18 import errno 19 if x.errno != errno.EEXIST: 20 raise
21 22 from VStreamer import to_file, from_file 23 from Counter import Counter 24
25 -class Repository:
26 - def __init__(self,dir):
27 self.dir = dir 28 self.init()
29
30 - def init(self):
31 makedirs(self.dir) 32 # dictionary of counters (None is the main counter, others are for subjobs) 33 self.counters = {None:Counter(self.dir)}
34
35 - def registerJobs(self, jobs, masterJob=None):
36 """ Register new jobs (or subjobs if master job is specified). 37 After registration the objects must be commited. 38 """ 39 40 ###logger.debug('registerJobs #jobs=%d master=%d',len(jobs), bool(master)) 41 dir = self.dir 42 master = masterJob 43 44 def make_new_ids(j,cnt): 45 try: 46 cntid = j.id 47 except AttributeError: 48 cntid = None 49 try: 50 counter = self.counters[cntid] 51 except KeyError: 52 counter = Counter(dir) 53 self.counters[cntid] = counter 54 return counter.make_new_ids(cnt)
55 56 def makejob(j,id): 57 j.id = id 58 newdir = os.path.join(dir,str(j.id)) 59 ###logger.debug('makejob: id=%s dir=%s',id,newdir) 60 makedir(newdir) 61 return newdir
62 63 if master: 64 dir = os.path.join(dir,str(master.id)) 65 ids = make_new_ids(master,len(jobs)) 66 else: 67 ids = make_new_ids(None,len(jobs)) 68 69 for (j,id) in zip(jobs,ids): 70 makejob(j,id) 71 if not master is None: 72 j._setParent(master) 73
74 - def commitJobs(self, jobs):
75 """ Commit jobs (or subjobs) which are specified in the list. 76 """ 77 ###logger.debug('commitJobs #jobs=%d ids=%s',len(jobs), [j.getFQID(os.sep) for j in jobs]) 78 79 for j in jobs: 80 dir = os.path.join(self.dir,j.getFQID(os.sep)) 81 to_file(j,file(os.path.join(dir,'data'),'w'))
82
83 - def deleteJobs(self, jobids):
84 ###logger.debug('deleteJobs #jobs=%d ids=%s',len(jobids), jobids) 85 for id in jobids: 86 try: 87 subpath = os.sep.join([str(i) for i in id]) 88 #update the corresponding counter for subjobs 89 #the use-case is limited to the rollback of subjobs in case of submission failure 90 self.counters[id[0]].subtract() 91 except TypeError: 92 subpath = str(id) 93 removedir(os.path.join(self.dir,subpath))
94
95 - def getJobIds(self,meta={}):
96 return self._getJobIds(self.dir,meta)
97
98 - def _getJobIds(self, dir,meta={}):
99 ###logger.debug('getJobIds meta=%s', meta) 100 101 if meta: 102 logger.warning('metadata selection not implemented (meta=%s)',meta) 103 104 ids = [] 105 for x in os.listdir(dir): 106 try: 107 ids.append(int(x)) 108 except ValueError: 109 pass 110 return ids
111
112 - def checkoutJobs(self,meta={}):
113 return self._checkoutJobs(self.dir,meta)
114
115 - def _checkoutJobs(self,dir,meta={}):
116 """ Checkout the jobs and return a list of job objects. 117 """ 118 ###logger.debug('checkoutJobs meta=%s', meta) 119 jobs = [] 120 error_summary = {} # summary of errors (exception reprs are keys, occurence count is value) 121 incomplete_ids = [] # ids of incomplete jobs (schema mismatch etc.) 122 bad_ids = [] # ids of ignored jobs (I/O errors) 123 entries_cnt = 0 124 125 master_ids = self._getJobIds(dir,meta) 126 127 # add a new error entry to error_summary 128 def add_error(e): 129 #re = repr(e) 130 re = str(e) 131 error_summary.setdefault(re,0) 132 error_summary[re] += 1
133 134 # read a job id from dir and append it to jobs list 135 # masterid (if set) is used for reporting purposes only 136 def read_job(dir,id,jobs,masterid=None): 137 def fqid(): 138 if masterid is None: 139 return str(id) 140 else: 141 return '.'.join([str(masterid),str(id)]) 142 try: 143 # read job data and reconstruct the object 144 j,errors = from_file(file(os.path.join(dir,str(id),'data'))) 145 if errors: # data errors 146 j.status = 'incomplete' 147 for e in errors: 148 add_error(e) 149 incomplete_ids.append(fqid()) 150 jobs.append(j) 151 return j 152 except KeyboardInterrupt: 153 #FIXME: any special cleanup needed? 154 raise 155 except Exception,x: # I/O and parsing errors 156 msg = 'Cannot read job %s: %s'%(fqid(),repr(x)) 157 add_error(msg) 158 bad_ids.append(fqid()) 159 logger.debug(msg) 160 Ganga.Utility.logging.log_user_exception(logger,debug=True) 161 162 # main loop 163 import time 164 progress = 0 165 t0=time.time() 166 for id in master_ids: 167 # progress log 168 if progress%100 == 0 and progress!=0: 169 logger.info('Loaded %d/%d jobs in %d seconds',progress,len(master_ids),time.time()-t0) 170 progress += 1 171 # read top-level job 172 j = read_job(dir,id,jobs) 173 entries_cnt += 1 174 if j: #FIXME: hardcoded subjobs handling 175 from Ganga.GPIDev.Lib.GangaList.GangaList import makeGangaList 176 subjobs_dir = os.path.join(dir,str(id)) 177 if os.path.isdir(subjobs_dir): 178 # get all subjob ids 179 subjob_ids = self._getJobIds(subjobs_dir) 180 entries_cnt += len(subjob_ids) 181 subjobs = [] 182 for sid in subjob_ids: 183 # read-in subjob objects 184 s = read_job(subjobs_dir,sid,subjobs,masterid=id) 185 s._setParent(j) 186 # initialize correctly the subjobs attribute 187 j._data['subjobs'] = makeGangaList(subjobs) 188 j.__setstate__(j.__dict__) 189 190 # print out reports 191 logger.info('Loaded total of %d job entries (including all subjobs)', entries_cnt) 192 logger.info('Loaded total of %d master job entries:',len(master_ids)) 193 logger.info('Load time %d seconds',time.time()-t0) 194 195 if bad_ids: 196 logger.error('Missing job entries due to I/O errors: %d/%d',len(bad_ids),entries_cnt) 197 if incomplete_ids: 198 logger.error('Job entries loaded in incomplete state due to data errors: %d/%d',len(incomplete_ids),entries_cnt) 199 if len(incomplete_ids) < 100: 200 logger.error('Incomplete job ids: %s',incomplete_ids) 201 if error_summary: 202 logger.error('Summary of problems:') 203 for re,cnt in error_summary.iteritems(): 204 logger.error(' - %d job entries: %s',cnt,re) 205 206 return jobs 207
208 - def releaseAllLocks(self):
209 logger.debug('releaseAllLocks')
210
211 - def setJobTree(self,x):
212 pass
213
214 - def getJobTree(self):
215 pass
216
217 - def resetAll(self):
218 removedir(self.dir) 219 self.init()
220