1 import Ganga.Utility.logging
2 logger = Ganga.Utility.logging.getLogger(modulename=1)
3
4 version = '5.0'
5
6 import os,shutil
7
10
13
15 try:
16 os.makedirs(d)
17 except OSError,x:
18 import errno
19 if x.errno != errno.EEXIST:
20 raise
21
22 from VStreamer import to_file, from_file
23 from Counter import Counter
24
27 self.dir = dir
28 self.init()
29
34
36 """ Register new jobs (or subjobs if master job is specified).
37 After registration the objects must be commited.
38 """
39
40
41 dir = self.dir
42 master = masterJob
43
44 def make_new_ids(j,cnt):
45 try:
46 cntid = j.id
47 except AttributeError:
48 cntid = None
49 try:
50 counter = self.counters[cntid]
51 except KeyError:
52 counter = Counter(dir)
53 self.counters[cntid] = counter
54 return counter.make_new_ids(cnt)
55
56 def makejob(j,id):
57 j.id = id
58 newdir = os.path.join(dir,str(j.id))
59
60 makedir(newdir)
61 return newdir
62
63 if master:
64 dir = os.path.join(dir,str(master.id))
65 ids = make_new_ids(master,len(jobs))
66 else:
67 ids = make_new_ids(None,len(jobs))
68
69 for (j,id) in zip(jobs,ids):
70 makejob(j,id)
71 if not master is None:
72 j._setParent(master)
73
75 """ Commit jobs (or subjobs) which are specified in the list.
76 """
77
78
79 for j in jobs:
80 dir = os.path.join(self.dir,j.getFQID(os.sep))
81 to_file(j,file(os.path.join(dir,'data'),'w'))
82
84
85 for id in jobids:
86 try:
87 subpath = os.sep.join([str(i) for i in id])
88
89
90 self.counters[id[0]].subtract()
91 except TypeError:
92 subpath = str(id)
93 removedir(os.path.join(self.dir,subpath))
94
97
99
100
101 if meta:
102 logger.warning('metadata selection not implemented (meta=%s)',meta)
103
104 ids = []
105 for x in os.listdir(dir):
106 try:
107 ids.append(int(x))
108 except ValueError:
109 pass
110 return ids
111
114
116 """ Checkout the jobs and return a list of job objects.
117 """
118
119 jobs = []
120 error_summary = {}
121 incomplete_ids = []
122 bad_ids = []
123 entries_cnt = 0
124
125 master_ids = self._getJobIds(dir,meta)
126
127
128 def add_error(e):
129
130 re = str(e)
131 error_summary.setdefault(re,0)
132 error_summary[re] += 1
133
134
135
136 def read_job(dir,id,jobs,masterid=None):
137 def fqid():
138 if masterid is None:
139 return str(id)
140 else:
141 return '.'.join([str(masterid),str(id)])
142 try:
143
144 j,errors = from_file(file(os.path.join(dir,str(id),'data')))
145 if errors:
146 j.status = 'incomplete'
147 for e in errors:
148 add_error(e)
149 incomplete_ids.append(fqid())
150 jobs.append(j)
151 return j
152 except KeyboardInterrupt:
153
154 raise
155 except Exception,x:
156 msg = 'Cannot read job %s: %s'%(fqid(),repr(x))
157 add_error(msg)
158 bad_ids.append(fqid())
159 logger.debug(msg)
160 Ganga.Utility.logging.log_user_exception(logger,debug=True)
161
162
163 import time
164 progress = 0
165 t0=time.time()
166 for id in master_ids:
167
168 if progress%100 == 0 and progress!=0:
169 logger.info('Loaded %d/%d jobs in %d seconds',progress,len(master_ids),time.time()-t0)
170 progress += 1
171
172 j = read_job(dir,id,jobs)
173 entries_cnt += 1
174 if j:
175 from Ganga.GPIDev.Lib.GangaList.GangaList import makeGangaList
176 subjobs_dir = os.path.join(dir,str(id))
177 if os.path.isdir(subjobs_dir):
178
179 subjob_ids = self._getJobIds(subjobs_dir)
180 entries_cnt += len(subjob_ids)
181 subjobs = []
182 for sid in subjob_ids:
183
184 s = read_job(subjobs_dir,sid,subjobs,masterid=id)
185 s._setParent(j)
186
187 j._data['subjobs'] = makeGangaList(subjobs)
188 j.__setstate__(j.__dict__)
189
190
191 logger.info('Loaded total of %d job entries (including all subjobs)', entries_cnt)
192 logger.info('Loaded total of %d master job entries:',len(master_ids))
193 logger.info('Load time %d seconds',time.time()-t0)
194
195 if bad_ids:
196 logger.error('Missing job entries due to I/O errors: %d/%d',len(bad_ids),entries_cnt)
197 if incomplete_ids:
198 logger.error('Job entries loaded in incomplete state due to data errors: %d/%d',len(incomplete_ids),entries_cnt)
199 if len(incomplete_ids) < 100:
200 logger.error('Incomplete job ids: %s',incomplete_ids)
201 if error_summary:
202 logger.error('Summary of problems:')
203 for re,cnt in error_summary.iteritems():
204 logger.error(' - %d job entries: %s',cnt,re)
205
206 return jobs
207
210
213
216
220