Package Ganga :: Package Core :: Package GangaRepository :: Module GangaRepositoryXML
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.GangaRepository.GangaRepositoryXML

  1  # Note: Following stuff must be considered in a GangaRepository: 
  2  # 
  3  # * lazy loading 
  4  # * locking 
  5   
  6  from GangaRepository import GangaRepository, PluginManagerError, EmptyGangaObject, RepositoryError, InaccessibleObjectError 
  7  from Ganga.Utility.Config import getConfig 
  8  import os, os.path, fcntl, time, errno 
  9   
 10  from SessionLock import SessionLockManager 
 11   
 12  import Ganga.Utility.logging 
 13  logger = Ganga.Utility.logging.getLogger() 
 14   
 15  from Ganga.Core.GangaRepository.PickleStreamer import to_file as pickle_to_file 
 16  from Ganga.Core.GangaRepository.PickleStreamer import from_file as pickle_from_file 
 17               
 18  from Ganga.Core.GangaRepository.VStreamer import to_file as xml_to_file 
 19  from Ganga.Core.GangaRepository.VStreamer import from_file as xml_from_file 
 20   
 21  from Ganga.GPIDev.Lib.GangaList.GangaList import makeGangaListByRef 
 22  from Ganga.GPIDev.Base.Objects import Node 
 23   
 24   
 25  import sys 
 26  if sys.hexversion >= 0x020600F0: 
 27      Set = set 
 28  else: 
 29      from sets import Set 
 30   
 31   
 32  printed_explanation = False 
 33   
34 -def safe_save(fn,obj,to_file,ignore_subs=''):
35 """Writes a file safely, raises IOError on error""" 36 if not os.path.exists(fn): 37 # file does not exist, so make it fast! 38 try: 39 to_file(obj, file(fn,"w"), ignore_subs) 40 return 41 except IOError, e: 42 raise IOError("Could not write file '%s' (%s)" % (fn,e)) 43 try: 44 tmpfile = open(fn + ".new", "w") 45 to_file(obj, tmpfile, ignore_subs) 46 # Important: Flush, then sync file before renaming! 47 #tmpfile.flush() 48 #os.fsync(tmpfile.fileno()) 49 tmpfile.close() 50 except IOError, e: 51 raise IOError("Could not write file %s.new (%s)" % (fn,e)) 52 # Try to make backup copy... 53 try: 54 os.unlink(fn+"~") 55 except OSError, e: 56 logger.debug("Error on removing file %s~ (%s) " % (fn,e)) 57 try: 58 os.rename(fn,fn+"~") 59 except OSError, e: 60 logger.debug("Error on file backup %s (%s) " % (fn,e)) 61 try: 62 os.rename(fn+".new",fn) 63 except OSError, e: 64 raise IOError("Error on moving file %s.new (%s) " % (fn,e))
65
66 -def rmrf(name):
67 if os.path.isdir(name): 68 for sfn in os.listdir(name): 69 rmrf(os.path.join(name,sfn)) 70 try: 71 os.removedirs(name) 72 except OSError: 73 pass 74 else: 75 try: 76 os.unlink(name) 77 except OSError: 78 pass
79
80 -class GangaRepositoryLocal(GangaRepository):
81 """GangaRepository Local""" 82
83 - def __init__(self, registry):
84 super(GangaRepositoryLocal,self).__init__(registry) 85 self.sub_split = "subjobs" 86 self.root = os.path.join(self.registry.location,"6.0",self.registry.name) 87 self.lockroot = os.path.join(self.registry.location,"6.0")
88
89 - def startup(self):
90 """ Starts an repository and reads in a directory structure. 91 Raise RepositoryError""" 92 self._load_timestamp = {} 93 self._cache_load_timestamp = {} 94 self.known_bad_ids = [] 95 if "XML" in self.registry.type: 96 self.to_file = xml_to_file 97 self.from_file = xml_from_file 98 elif "Pickle" in self.registry.type: 99 self.to_file = pickle_to_file 100 self.from_file = pickle_from_file 101 else: 102 raise RepositoryError(self.repo, "Unknown Repository type: %s"%self.registry.type) 103 self.sessionlock = SessionLockManager(self, self.lockroot, self.registry.name) 104 self.sessionlock.startup() 105 # Load the list of files, this time be verbose and print out a summary of errors 106 self.update_index(verbose = True)
107 108
109 - def shutdown(self):
110 """Shutdown the repository. Flushing is done by the Registry 111 Raise RepositoryError""" 112 self.sessionlock.shutdown()
113
114 - def get_fn(self,id):
115 """ Returns the file name where the data for this object id is saved""" 116 return os.path.join(self.root,"%ixxx"%(id/1000), "%i"%id, "data")
117
118 - def get_idxfn(self,id):
119 """ Returns the file name where the data for this object id is saved""" 120 return os.path.join(self.root,"%ixxx"%(id/1000), "%i.index"%id)
121
122 - def index_load(self,id):
123 """ load the index file for this object if necessary 124 Loads if never loaded or timestamp changed. Creates object if necessary 125 Returns True if this object has been changed, False if not 126 Raise IOError on access or unpickling error 127 Raise OSError on stat error 128 Raise PluginManagerError if the class name is not found""" 129 logger.debug("Loading index %s" % id) 130 fn = self.get_idxfn(id) 131 if self._cache_load_timestamp.get(id,0) != os.stat(fn).st_ctime: # index timestamp changed 132 fobj = file(fn) 133 try: 134 try: 135 cat,cls,cache = pickle_from_file(fobj)[0] 136 except Exception, x: 137 raise IOError("Error on unpickling: %s %s" % (x.__class__.__name__, x)) 138 if id in self.objects: 139 obj = self.objects[id] 140 if obj._data: 141 obj.__dict__["_registry_refresh"] = True 142 else: 143 obj = self._make_empty_object_(id,cat,cls) 144 obj._index_cache = cache 145 finally: 146 fobj.close() 147 self._cache_load_timestamp[id] = os.stat(fn).st_ctime 148 return True 149 return False
150
151 - def index_write(self,id):
152 """ write an index file for this object (must be locked). 153 Should not raise any Errors """ 154 obj = self.objects[id] 155 try: 156 ifn = self.get_idxfn(id) 157 new_idx_cache = self.registry.getIndexCache(obj) 158 if new_idx_cache != obj._index_cache or not os.path.exists(ifn): 159 obj._index_cache = new_idx_cache 160 pickle_to_file((obj._category,obj._name,obj._index_cache),file(ifn,"w")) 161 except IOError, x: 162 logger.error("Index saving to '%s' failed: %s %s" % (ifn,x.__class__.__name__,x))
163
164 - def get_index_listing(self):
165 """Get dictionary of possible objects in the Repository: True means index is present, 166 False if not present 167 Raise RepositoryError""" 168 try: 169 obj_chunks = [d for d in os.listdir(self.root) if d.endswith("xxx") and d[:-3].isdigit()] 170 except OSError: 171 raise RepositoryError(self, "Could not list repository '%s'!" % (self.root)) 172 objs = {} # True means index is present, False means index not present 173 for c in obj_chunks: 174 try: 175 listing = os.listdir(os.path.join(self.root,c)) 176 except OSError: 177 raise RepositoryError(self, "Could not list repository '%s'!" % (os.path.join(self.root,c))) 178 objs.update(dict([(int(l),False) for l in listing if l.isdigit()])) 179 for l in listing: 180 if l.endswith(".index") and l[:-6].isdigit(): 181 id = int(l[:-6]) 182 if id in objs: 183 objs[id] = True 184 else: 185 try: 186 os.unlink(self.get_idxfn(id)) 187 logger.warning("Deleted index file without data file: %s" % self.get_idxfn(id)) 188 except OSError: 189 pass 190 return objs
191
192 - def update_index(self,id = None,verbose=False):
193 """ Update the list of available objects 194 Raise RepositoryError""" 195 # First locate and load the index files 196 logger.debug("updating index...") 197 objs = self.get_index_listing() 198 changed_ids = [] 199 deleted_ids = Set(self.objects.keys()) 200 summary = [] 201 for id, idx in objs.iteritems(): 202 deleted_ids.discard(id) 203 # Make sure we do not overwrite older jobs if someone deleted the count file 204 if id > self.sessionlock.count: 205 self.sessionlock.count = id + 1 206 # Locked IDs can be ignored 207 if id in self.sessionlock.locked: 208 continue 209 # Now we treat unlocked IDs 210 try: 211 if self.index_load(id): # if this succeeds, all is well and we are done 212 changed_ids.append(id) 213 continue 214 except IOError, x: 215 logger.debug("IOError: Failed to load index %i: %s" % (id,x)) 216 except OSError, x: 217 logger.debug("OSError: Failed to load index %i: %s" % (id,x)) 218 except PluginManagerError, x: 219 logger.debug("PluginManagerError: Failed to load index %i: %s" % (id,x)) # Probably should be DEBUG 220 summary.append((id,x)) # This is a FATAL error - do not try to load the main file, it will fail as well 221 continue 222 if not id in self.objects: # this is bad - no or corrupted index but object not loaded yet! Try to load it! 223 try: 224 self.load([id]) 225 changed_ids.append(id) 226 # Write out a new index if the file can be locked 227 if len(self.lock([id])) != 0: 228 self.index_write(id) 229 self.unlock([id]) 230 except KeyError: 231 # deleted job 232 if id in self.objects: 233 self._internal_del__(id) 234 changed_ids.append(id) 235 except InaccessibleObjectError, x: 236 logger.debug("Failed to load id %i: %s %s" % (id, x.orig.__class__.__name__, x.orig)) 237 summary.append((id,x.orig)) 238 # Check deleted files: 239 for id in deleted_ids: 240 self._internal_del__(id) 241 changed_ids.append(id) 242 if len(deleted_ids) > 0: 243 logger.warning("Registry '%s': Job %s externally deleted." % (self.registry.name, ",".join(map(str,list(deleted_ids))))) 244 245 if len(summary) > 0: 246 cnt = {} 247 examples = {} 248 for id,x in summary: 249 if id in self.known_bad_ids: 250 continue 251 cnt[x.__class__.__name__] = cnt.get(x.__class__.__name__,[]) + [str(id)] 252 examples[x.__class__.__name__] = str(x) 253 self.known_bad_ids.append(id) 254 for exc,ids in cnt.items(): 255 logger.error("Registry '%s': Failed to load %i jobs (IDs: %s) due to '%s' (first error: %s)" % (self.registry.name, len(ids), ",".join(ids), exc, examples[exc])) 256 global printed_explanation 257 if not printed_explanation: 258 logger.error("If you want to delete the incomplete objects, you can type 'for i in %s.incomplete_ids(): %s(i).remove()' (press 'Enter' twice)" % (self.registry.name, self.registry.name)) 259 printed_explanation = True 260 logger.debug("updated index done") 261 return changed_ids
262
263 - def add(self, objs, force_ids = None):
264 """ Add the given objects to the repository, forcing the IDs if told to. 265 Raise RepositoryError""" 266 if not force_ids is None: # assume the ids are already locked by Registry 267 if not len(objs) == len(force_ids): 268 raise RepositoryError(self, "Internal Error: add with different number of objects and force_ids!") 269 ids = force_ids 270 else: 271 ids = self.sessionlock.make_new_ids(len(objs)) 272 for i in range(0,len(objs)): 273 fn = self.get_fn(ids[i]) 274 try: 275 os.makedirs(os.path.dirname(fn)) 276 except OSError, e: 277 if e.errno != errno.EEXIST: 278 raise RepositoryError(self,"OSError on mkdir: %s" % (str(e))) 279 self._internal_setitem__(ids[i], objs[i]) 280 # Set subjobs dirty - they will not be flushed if they are not. 281 if self.sub_split and self.sub_split in objs[i]._data: 282 try: 283 for j in range(len(objs[i]._data[self.sub_split])): 284 objs[i]._data[self.sub_split][j]._dirty = True 285 except AttributeError: 286 pass # this is not a list of Ganga objects 287 return ids
288
289 - def flush(self, ids):
290 for id in ids: 291 try: 292 fn = self.get_fn(id) 293 obj = self.objects[id] 294 if obj._name != "EmptyGangaObject": 295 split_cache = None 296 do_sub_split = (not self.sub_split is None) and (self.sub_split in obj._data) and len(obj._data[self.sub_split]) > 0 and hasattr(obj._data[self.sub_split][0],"_dirty") 297 if do_sub_split: 298 split_cache = obj._data[self.sub_split] 299 for i in range(len(split_cache)): 300 if not split_cache[i]._dirty: 301 continue 302 sfn = os.path.join(os.path.dirname(fn),str(i),"data") 303 try: 304 os.makedirs(os.path.dirname(sfn)) 305 except OSError, e: 306 if e.errno != errno.EEXIST: 307 raise RepositoryError(self,"OSError: " + str(e)) 308 safe_save(sfn, split_cache[i], self.to_file) 309 split_cache[i]._setFlushed() 310 safe_save(fn, obj, self.to_file, self.sub_split) 311 # clean files not in subjobs anymore... (bug 64041) 312 for idn in os.listdir(os.path.dirname(fn)): 313 if idn.isdigit() and int(idn) >= len(split_cache): 314 rmrf(os.path.join(os.path.dirname(fn),idn)) 315 else: 316 safe_save(fn, obj, self.to_file, "") 317 # clean files leftover from sub_split 318 for idn in os.listdir(os.path.dirname(fn)): 319 if idn.isdigit(): 320 rmrf(os.path.join(os.path.dirname(fn),idn)) 321 self.index_write(id) 322 obj._setFlushed() 323 except OSError, x: 324 raise RepositoryError(self,"OSError on flushing id '%i': %s" % (id,str(x))) 325 except IOError, x: 326 raise RepositoryError(self,"IOError on flushing id '%i': %s" % (id,str(x)))
327
328 - def load(self, ids, load_backup=False):
329 for id in ids: 330 fn = self.get_fn(id) 331 if load_backup: 332 fn = fn+"~" 333 try: 334 fobj = file(fn,"r") 335 except IOError, x: 336 if x.errno == errno.ENOENT: 337 # remove index so we do not continue working with wrong information 338 try: 339 self._internal_del__(id) # remove internal representation 340 os.unlink(os.path.dirname(fn)+".index") 341 except OSError: 342 pass 343 raise KeyError(id) 344 else: 345 raise RepositoryError(self,"IOError: " + str(x)) 346 try: 347 must_load = (not id in self.objects) or (self.objects[id]._data is None) 348 tmpobj = None 349 if must_load or (self._load_timestamp.get(id,0) != os.fstat(fobj.fileno()).st_ctime): 350 tmpobj, errs = self.from_file(fobj) 351 do_sub_split = (not self.sub_split is None) and (self.sub_split in tmpobj._data) and len(tmpobj._data[self.sub_split]) == 0 352 if do_sub_split: 353 i = 0 354 ld = os.listdir(os.path.dirname(fn)) 355 l = [] 356 while str(i) in ld: 357 sfn = os.path.join(os.path.dirname(fn),str(i),"data") 358 if load_backup: 359 sfn = sfn+"~" 360 try: 361 sfobj = file(sfn,"r") 362 except IOError, x: 363 if x.errno == errno.ENOENT: 364 raise IOError("Subobject %i.%i not found: %s" % (id,i,x)) 365 else: 366 raise RepositoryError(self,"IOError on loading subobject %i.%i: %s" % (id,i,x)) 367 ff = self.from_file(sfobj) 368 l.append(ff[0]) 369 errs.extend(ff[1]) 370 i += 1 371 tmpobj._data[self.sub_split] = makeGangaListByRef(l) 372 if len(errs) > 0: 373 raise errs[0] 374 #if len(errs) > 0 and "status" in tmpobj._data: # MAGIC "status" if incomplete 375 # tmpobj._data["status"] = "incomplete" 376 # logger.error("Registry '%s': Could not load parts of object #%i: %s" % (self.registry.name,id,map(str,errs))) 377 if id in self.objects: 378 obj = self.objects[id] 379 obj._data = tmpobj._data 380 # Fix parent for objects in _data (necessary!) 381 for n, v in obj._data.items(): 382 if isinstance(v,Node): 383 v._setParent(obj) 384 if (isinstance(v,list) or v.__class__.__name__ == "GangaList"): 385 # set the parent of the list or dictionary (or other iterable) items 386 for i in v: 387 if isinstance(i,Node): 388 i._setParent(obj) 389 # Check if index cache; if loaded; was valid: 390 if obj._index_cache: 391 new_idx_cache = self.registry.getIndexCache(obj) 392 if new_idx_cache != obj._index_cache: 393 # index is wrong! Try to get read access - then we can fix this 394 if len(self.lock([id])) != 0: 395 self.index_write(id) 396 self.unlock([id]) 397 logger.warning("Incorrect index cache of '%s' object #%s was corrected!" % (self.registry.name, id)) 398 # if we cannot lock this, the inconsistency is most likely the result of another ganga process modifying the repo 399 obj._index_cache = None 400 else: 401 self._internal_setitem__(id, tmpobj) 402 if do_sub_split: 403 try: 404 for sobj in self.objects[id]._data[self.sub_split]: 405 sobj._setParent(self.objects[id]) 406 except AttributeError: 407 pass # not actually Ganga objects in the sub-split field 408 self.objects[id]._data[self.sub_split]._setParent(self.objects[id]) 409 410 self._load_timestamp[id] = os.fstat(fobj.fileno()).st_ctime 411 except RepositoryError: 412 raise 413 except Exception, x: 414 if load_backup: 415 logger.debug("Could not load backup object #%i: %s %s", id, x.__class__.__name__, x) 416 raise InaccessibleObjectError(self,id,x) 417 418 logger.debug("Could not load object #%i: %s %s", id, x.__class__.__name__, x) 419 # try loading backup 420 try: 421 self.load([id],load_backup=True) 422 logger.warning("Object '%s' #%i loaded from backup file - the last changes may be lost.", self.registry.name, id) 423 continue 424 except Exception: 425 pass 426 # add object to incomplete_objects 427 if not id in self.incomplete_objects: 428 self.incomplete_objects.append(id) 429 # remove index so we do not continue working with wrong information 430 try: 431 os.unlink(os.path.dirname(fn)+".index") 432 except OSError: 433 pass 434 #self._internal_setitem__(id, EmptyGangaObject()) // NO NO NO! BREAKS EVERYTHING HORRIBLY! 435 raise InaccessibleObjectError(self,id,x)
436
437 - def delete(self, ids):
438 for id in ids: 439 # First remove the index, so that it is gone if we later have a KeyError 440 fn = self.get_fn(id) 441 try: 442 os.unlink(os.path.dirname(fn)+".index") 443 except OSError: 444 pass 445 self._internal_del__(id) 446 rmrf(os.path.dirname(fn))
447
448 - def lock(self,ids):
449 return self.sessionlock.lock_ids(ids)
450
451 - def unlock(self,ids):
452 released_ids = self.sessionlock.release_ids(ids) 453 if len(released_ids) < len(ids): 454 logger.error("The write locks of some objects could not be released!")
455
456 - def get_lock_session(self,id):
457 """get_lock_session(id) 458 Tries to determine the session that holds the lock on id for information purposes, and return an informative string. 459 Returns None on failure 460 """ 461 return self.sessionlock.get_lock_session(id) 462
463 - def get_other_sessions(self):
464 """get_session_list() 465 Tries to determine the other sessions that are active and returns an informative string for each of them. 466 """ 467 return self.sessionlock.get_other_sessions() 468
469 - def reap_locks(self):
470 """reap_locks() --> True/False 471 Remotely clear all foreign locks from the session. 472 WARNING: This is not nice. 473 Returns True on success, False on error.""" 474 return self.sessionlock.reap_locks()
475
476 - def clean(self):
477 """clean() --> True/False 478 Clear EVERYTHING in this repository, counter, all jobs, etc. 479 WARNING: This is not nice.""" 480 self.shutdown() 481 rmrf(self.root) 482 self.startup()
483