1
2
3
4
5
6 from GangaRepository import GangaRepository, PluginManagerError, EmptyGangaObject, RepositoryError, InaccessibleObjectError
7 from Ganga.Utility.Config import getConfig
8 import os, os.path, fcntl, time, errno
9
10 from SessionLock import SessionLockManager
11
12 import Ganga.Utility.logging
13 logger = Ganga.Utility.logging.getLogger()
14
15 from Ganga.Core.GangaRepository.PickleStreamer import to_file as pickle_to_file
16 from Ganga.Core.GangaRepository.PickleStreamer import from_file as pickle_from_file
17
18 from Ganga.Core.GangaRepository.VStreamer import to_file as xml_to_file
19 from Ganga.Core.GangaRepository.VStreamer import from_file as xml_from_file
20
21 from Ganga.GPIDev.Lib.GangaList.GangaList import makeGangaListByRef
22 from Ganga.GPIDev.Base.Objects import Node
23
24
25 import sys
26 if sys.hexversion >= 0x020600F0:
27 Set = set
28 else:
29 from sets import Set
30
31
32 printed_explanation = False
33
35 """Writes a file safely, raises IOError on error"""
36 if not os.path.exists(fn):
37
38 try:
39 to_file(obj, file(fn,"w"), ignore_subs)
40 return
41 except IOError, e:
42 raise IOError("Could not write file '%s' (%s)" % (fn,e))
43 try:
44 tmpfile = open(fn + ".new", "w")
45 to_file(obj, tmpfile, ignore_subs)
46
47
48
49 tmpfile.close()
50 except IOError, e:
51 raise IOError("Could not write file %s.new (%s)" % (fn,e))
52
53 try:
54 os.unlink(fn+"~")
55 except OSError, e:
56 logger.debug("Error on removing file %s~ (%s) " % (fn,e))
57 try:
58 os.rename(fn,fn+"~")
59 except OSError, e:
60 logger.debug("Error on file backup %s (%s) " % (fn,e))
61 try:
62 os.rename(fn+".new",fn)
63 except OSError, e:
64 raise IOError("Error on moving file %s.new (%s) " % (fn,e))
65
67 if os.path.isdir(name):
68 for sfn in os.listdir(name):
69 rmrf(os.path.join(name,sfn))
70 try:
71 os.removedirs(name)
72 except OSError:
73 pass
74 else:
75 try:
76 os.unlink(name)
77 except OSError:
78 pass
79
81 """GangaRepository Local"""
82
88
90 """ Starts an repository and reads in a directory structure.
91 Raise RepositoryError"""
92 self._load_timestamp = {}
93 self._cache_load_timestamp = {}
94 self.known_bad_ids = []
95 if "XML" in self.registry.type:
96 self.to_file = xml_to_file
97 self.from_file = xml_from_file
98 elif "Pickle" in self.registry.type:
99 self.to_file = pickle_to_file
100 self.from_file = pickle_from_file
101 else:
102 raise RepositoryError(self.repo, "Unknown Repository type: %s"%self.registry.type)
103 self.sessionlock = SessionLockManager(self, self.lockroot, self.registry.name)
104 self.sessionlock.startup()
105
106 self.update_index(verbose = True)
107
108
110 """Shutdown the repository. Flushing is done by the Registry
111 Raise RepositoryError"""
112 self.sessionlock.shutdown()
113
115 """ Returns the file name where the data for this object id is saved"""
116 return os.path.join(self.root,"%ixxx"%(id/1000), "%i"%id, "data")
117
119 """ Returns the file name where the data for this object id is saved"""
120 return os.path.join(self.root,"%ixxx"%(id/1000), "%i.index"%id)
121
123 """ load the index file for this object if necessary
124 Loads if never loaded or timestamp changed. Creates object if necessary
125 Returns True if this object has been changed, False if not
126 Raise IOError on access or unpickling error
127 Raise OSError on stat error
128 Raise PluginManagerError if the class name is not found"""
129 logger.debug("Loading index %s" % id)
130 fn = self.get_idxfn(id)
131 if self._cache_load_timestamp.get(id,0) != os.stat(fn).st_ctime:
132 fobj = file(fn)
133 try:
134 try:
135 cat,cls,cache = pickle_from_file(fobj)[0]
136 except Exception, x:
137 raise IOError("Error on unpickling: %s %s" % (x.__class__.__name__, x))
138 if id in self.objects:
139 obj = self.objects[id]
140 if obj._data:
141 obj.__dict__["_registry_refresh"] = True
142 else:
143 obj = self._make_empty_object_(id,cat,cls)
144 obj._index_cache = cache
145 finally:
146 fobj.close()
147 self._cache_load_timestamp[id] = os.stat(fn).st_ctime
148 return True
149 return False
150
152 """ write an index file for this object (must be locked).
153 Should not raise any Errors """
154 obj = self.objects[id]
155 try:
156 ifn = self.get_idxfn(id)
157 new_idx_cache = self.registry.getIndexCache(obj)
158 if new_idx_cache != obj._index_cache or not os.path.exists(ifn):
159 obj._index_cache = new_idx_cache
160 pickle_to_file((obj._category,obj._name,obj._index_cache),file(ifn,"w"))
161 except IOError, x:
162 logger.error("Index saving to '%s' failed: %s %s" % (ifn,x.__class__.__name__,x))
163
165 """Get dictionary of possible objects in the Repository: True means index is present,
166 False if not present
167 Raise RepositoryError"""
168 try:
169 obj_chunks = [d for d in os.listdir(self.root) if d.endswith("xxx") and d[:-3].isdigit()]
170 except OSError:
171 raise RepositoryError(self, "Could not list repository '%s'!" % (self.root))
172 objs = {}
173 for c in obj_chunks:
174 try:
175 listing = os.listdir(os.path.join(self.root,c))
176 except OSError:
177 raise RepositoryError(self, "Could not list repository '%s'!" % (os.path.join(self.root,c)))
178 objs.update(dict([(int(l),False) for l in listing if l.isdigit()]))
179 for l in listing:
180 if l.endswith(".index") and l[:-6].isdigit():
181 id = int(l[:-6])
182 if id in objs:
183 objs[id] = True
184 else:
185 try:
186 os.unlink(self.get_idxfn(id))
187 logger.warning("Deleted index file without data file: %s" % self.get_idxfn(id))
188 except OSError:
189 pass
190 return objs
191
193 """ Update the list of available objects
194 Raise RepositoryError"""
195
196 logger.debug("updating index...")
197 objs = self.get_index_listing()
198 changed_ids = []
199 deleted_ids = Set(self.objects.keys())
200 summary = []
201 for id, idx in objs.iteritems():
202 deleted_ids.discard(id)
203
204 if id > self.sessionlock.count:
205 self.sessionlock.count = id + 1
206
207 if id in self.sessionlock.locked:
208 continue
209
210 try:
211 if self.index_load(id):
212 changed_ids.append(id)
213 continue
214 except IOError, x:
215 logger.debug("IOError: Failed to load index %i: %s" % (id,x))
216 except OSError, x:
217 logger.debug("OSError: Failed to load index %i: %s" % (id,x))
218 except PluginManagerError, x:
219 logger.debug("PluginManagerError: Failed to load index %i: %s" % (id,x))
220 summary.append((id,x))
221 continue
222 if not id in self.objects:
223 try:
224 self.load([id])
225 changed_ids.append(id)
226
227 if len(self.lock([id])) != 0:
228 self.index_write(id)
229 self.unlock([id])
230 except KeyError:
231
232 if id in self.objects:
233 self._internal_del__(id)
234 changed_ids.append(id)
235 except InaccessibleObjectError, x:
236 logger.debug("Failed to load id %i: %s %s" % (id, x.orig.__class__.__name__, x.orig))
237 summary.append((id,x.orig))
238
239 for id in deleted_ids:
240 self._internal_del__(id)
241 changed_ids.append(id)
242 if len(deleted_ids) > 0:
243 logger.warning("Registry '%s': Job %s externally deleted." % (self.registry.name, ",".join(map(str,list(deleted_ids)))))
244
245 if len(summary) > 0:
246 cnt = {}
247 examples = {}
248 for id,x in summary:
249 if id in self.known_bad_ids:
250 continue
251 cnt[x.__class__.__name__] = cnt.get(x.__class__.__name__,[]) + [str(id)]
252 examples[x.__class__.__name__] = str(x)
253 self.known_bad_ids.append(id)
254 for exc,ids in cnt.items():
255 logger.error("Registry '%s': Failed to load %i jobs (IDs: %s) due to '%s' (first error: %s)" % (self.registry.name, len(ids), ",".join(ids), exc, examples[exc]))
256 global printed_explanation
257 if not printed_explanation:
258 logger.error("If you want to delete the incomplete objects, you can type 'for i in %s.incomplete_ids(): %s(i).remove()' (press 'Enter' twice)" % (self.registry.name, self.registry.name))
259 printed_explanation = True
260 logger.debug("updated index done")
261 return changed_ids
262
263 - def add(self, objs, force_ids = None):
264 """ Add the given objects to the repository, forcing the IDs if told to.
265 Raise RepositoryError"""
266 if not force_ids is None:
267 if not len(objs) == len(force_ids):
268 raise RepositoryError(self, "Internal Error: add with different number of objects and force_ids!")
269 ids = force_ids
270 else:
271 ids = self.sessionlock.make_new_ids(len(objs))
272 for i in range(0,len(objs)):
273 fn = self.get_fn(ids[i])
274 try:
275 os.makedirs(os.path.dirname(fn))
276 except OSError, e:
277 if e.errno != errno.EEXIST:
278 raise RepositoryError(self,"OSError on mkdir: %s" % (str(e)))
279 self._internal_setitem__(ids[i], objs[i])
280
281 if self.sub_split and self.sub_split in objs[i]._data:
282 try:
283 for j in range(len(objs[i]._data[self.sub_split])):
284 objs[i]._data[self.sub_split][j]._dirty = True
285 except AttributeError:
286 pass
287 return ids
288
290 for id in ids:
291 try:
292 fn = self.get_fn(id)
293 obj = self.objects[id]
294 if obj._name != "EmptyGangaObject":
295 split_cache = None
296 do_sub_split = (not self.sub_split is None) and (self.sub_split in obj._data) and len(obj._data[self.sub_split]) > 0 and hasattr(obj._data[self.sub_split][0],"_dirty")
297 if do_sub_split:
298 split_cache = obj._data[self.sub_split]
299 for i in range(len(split_cache)):
300 if not split_cache[i]._dirty:
301 continue
302 sfn = os.path.join(os.path.dirname(fn),str(i),"data")
303 try:
304 os.makedirs(os.path.dirname(sfn))
305 except OSError, e:
306 if e.errno != errno.EEXIST:
307 raise RepositoryError(self,"OSError: " + str(e))
308 safe_save(sfn, split_cache[i], self.to_file)
309 split_cache[i]._setFlushed()
310 safe_save(fn, obj, self.to_file, self.sub_split)
311
312 for idn in os.listdir(os.path.dirname(fn)):
313 if idn.isdigit() and int(idn) >= len(split_cache):
314 rmrf(os.path.join(os.path.dirname(fn),idn))
315 else:
316 safe_save(fn, obj, self.to_file, "")
317
318 for idn in os.listdir(os.path.dirname(fn)):
319 if idn.isdigit():
320 rmrf(os.path.join(os.path.dirname(fn),idn))
321 self.index_write(id)
322 obj._setFlushed()
323 except OSError, x:
324 raise RepositoryError(self,"OSError on flushing id '%i': %s" % (id,str(x)))
325 except IOError, x:
326 raise RepositoryError(self,"IOError on flushing id '%i': %s" % (id,str(x)))
327
328 - def load(self, ids, load_backup=False):
329 for id in ids:
330 fn = self.get_fn(id)
331 if load_backup:
332 fn = fn+"~"
333 try:
334 fobj = file(fn,"r")
335 except IOError, x:
336 if x.errno == errno.ENOENT:
337
338 try:
339 self._internal_del__(id)
340 os.unlink(os.path.dirname(fn)+".index")
341 except OSError:
342 pass
343 raise KeyError(id)
344 else:
345 raise RepositoryError(self,"IOError: " + str(x))
346 try:
347 must_load = (not id in self.objects) or (self.objects[id]._data is None)
348 tmpobj = None
349 if must_load or (self._load_timestamp.get(id,0) != os.fstat(fobj.fileno()).st_ctime):
350 tmpobj, errs = self.from_file(fobj)
351 do_sub_split = (not self.sub_split is None) and (self.sub_split in tmpobj._data) and len(tmpobj._data[self.sub_split]) == 0
352 if do_sub_split:
353 i = 0
354 ld = os.listdir(os.path.dirname(fn))
355 l = []
356 while str(i) in ld:
357 sfn = os.path.join(os.path.dirname(fn),str(i),"data")
358 if load_backup:
359 sfn = sfn+"~"
360 try:
361 sfobj = file(sfn,"r")
362 except IOError, x:
363 if x.errno == errno.ENOENT:
364 raise IOError("Subobject %i.%i not found: %s" % (id,i,x))
365 else:
366 raise RepositoryError(self,"IOError on loading subobject %i.%i: %s" % (id,i,x))
367 ff = self.from_file(sfobj)
368 l.append(ff[0])
369 errs.extend(ff[1])
370 i += 1
371 tmpobj._data[self.sub_split] = makeGangaListByRef(l)
372 if len(errs) > 0:
373 raise errs[0]
374
375
376
377 if id in self.objects:
378 obj = self.objects[id]
379 obj._data = tmpobj._data
380
381 for n, v in obj._data.items():
382 if isinstance(v,Node):
383 v._setParent(obj)
384 if (isinstance(v,list) or v.__class__.__name__ == "GangaList"):
385
386 for i in v:
387 if isinstance(i,Node):
388 i._setParent(obj)
389
390 if obj._index_cache:
391 new_idx_cache = self.registry.getIndexCache(obj)
392 if new_idx_cache != obj._index_cache:
393
394 if len(self.lock([id])) != 0:
395 self.index_write(id)
396 self.unlock([id])
397 logger.warning("Incorrect index cache of '%s' object #%s was corrected!" % (self.registry.name, id))
398
399 obj._index_cache = None
400 else:
401 self._internal_setitem__(id, tmpobj)
402 if do_sub_split:
403 try:
404 for sobj in self.objects[id]._data[self.sub_split]:
405 sobj._setParent(self.objects[id])
406 except AttributeError:
407 pass
408 self.objects[id]._data[self.sub_split]._setParent(self.objects[id])
409
410 self._load_timestamp[id] = os.fstat(fobj.fileno()).st_ctime
411 except RepositoryError:
412 raise
413 except Exception, x:
414 if load_backup:
415 logger.debug("Could not load backup object #%i: %s %s", id, x.__class__.__name__, x)
416 raise InaccessibleObjectError(self,id,x)
417
418 logger.debug("Could not load object #%i: %s %s", id, x.__class__.__name__, x)
419
420 try:
421 self.load([id],load_backup=True)
422 logger.warning("Object '%s' #%i loaded from backup file - the last changes may be lost.", self.registry.name, id)
423 continue
424 except Exception:
425 pass
426
427 if not id in self.incomplete_objects:
428 self.incomplete_objects.append(id)
429
430 try:
431 os.unlink(os.path.dirname(fn)+".index")
432 except OSError:
433 pass
434
435 raise InaccessibleObjectError(self,id,x)
436
438 for id in ids:
439
440 fn = self.get_fn(id)
441 try:
442 os.unlink(os.path.dirname(fn)+".index")
443 except OSError:
444 pass
445 self._internal_del__(id)
446 rmrf(os.path.dirname(fn))
447
448 - def lock(self,ids):
450
452 released_ids = self.sessionlock.release_ids(ids)
453 if len(released_ids) < len(ids):
454 logger.error("The write locks of some objects could not be released!")
455
457 """get_lock_session(id)
458 Tries to determine the session that holds the lock on id for information purposes, and return an informative string.
459 Returns None on failure
460 """
461 return self.sessionlock.get_lock_session(id)
462
464 """get_session_list()
465 Tries to determine the other sessions that are active and returns an informative string for each of them.
466 """
467 return self.sessionlock.get_other_sessions()
468
470 """reap_locks() --> True/False
471 Remotely clear all foreign locks from the session.
472 WARNING: This is not nice.
473 Returns True on success, False on error."""
474 return self.sessionlock.reap_locks()
475
477 """clean() --> True/False
478 Clear EVERYTHING in this repository, counter, all jobs, etc.
479 WARNING: This is not nice."""
480 self.shutdown()
481 rmrf(self.root)
482 self.startup()
483