Package Ganga :: Package Core :: Package GangaRepository :: Module Registry
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.GangaRepository.Registry

  1  import Ganga.Utility.logging 
  2  logger = Ganga.Utility.logging.getLogger() 
  3   
  4  from Ganga.Core import GangaException 
  5  from GangaRepository import InaccessibleObjectError 
  6   
  7  import time, threading 
  8   
  9  import sys 
 10   
 11  if sys.hexversion >= 0x020600F0: 
 12      Set = set 
 13  else: 
 14      from sets import Set 
 15   
16 -class RegistryError(GangaException):
17 - def __init__(self,what):
18 GangaException.__init__(self,what) 19 self.what=what
20 - def __str__(self):
21 return "RegistryError: %s"%self.what
22
23 -class RegistryAccessError(RegistryError):
24 """ This error is raised if the request is valid in principle, 25 but the Registry cannot be accessed at the moment."""
26 - def __str__(self):
27 return "RegistryAccessError: %s"%self.what
28
29 -class RegistryLockError(RegistryError):
30 """ This error is raised if the request is valid in principle, 31 but the object is locked by another Ganga session"""
32 - def __str__(self):
33 return "RegistryLockError: %s"%self.what
34
35 -class ObjectNotInRegistryError(RegistryError):
36 """ This error is raised if an object has been associated to this registry, 37 but is not actually in the registry. This most probably indicates an internal Ganga error."""
38 - def __str__(self):
39 return "ObjectNotInRegistryError: %s"%self.what
40
41 -class RegistryKeyError(RegistryError,KeyError):
42 """ This error is raised if the given id is not found in the registry """
43 - def __str__(self):
44 return "RegistryKeyError: %s"%self.what
45
46 -class RegistryIndexError(RegistryError,IndexError):
47 """ This error is raised if the given id is not found in the registry """
48 - def __str__(self):
49 return "RegistryIndexError: %s"%self.what
50
51 -def makeRepository(registry):
52 """Factory that selects, imports and instantiates the correct GangaRepository""" 53 if registry.type in ["LocalXML","LocalPickle"]: 54 from GangaRepositoryXML import GangaRepositoryLocal 55 return GangaRepositoryLocal(registry) 56 elif registry.type in ["SQLite"]: 57 from GangaRepositorySQLite import GangaRepositorySQLite 58 return GangaRepositorySQLite(registry) 59 elif registry.type in ["Transient"]: 60 return GangaRepository(registry) 61 else: 62 raise RegistryError("Repository %s: Unknown repository type %s" % (registry.name, registry.type))
63
64 -class IncompleteObject(object):
65 """ This class represents an object that could not be loaded on startup"""
66 - def __init__(self, registry, id):
67 self.registry = registry 68 self.id = id
69
70 - def reload(self):
71 self.registry._lock.acquire() 72 try: 73 self.registry.repository.load([self.id]) 74 print "Successfully reloaded '%s' object #%i!" % (self.registry.name,self.id) 75 for d in self.registry.changed_ids.itervalues(): 76 d.add(id) 77 finally: 78 self.registry._lock.release()
79
80 - def remove(self):
81 self.registry._lock.acquire() 82 try: 83 if len(self.registry.repository.lock([self.id])) == 0: 84 errstr = "Could not lock '%s' object #%i!" % (self.registry.name,self.id) 85 try: 86 errstr += " Object is locked by session '%s' " % self.registry.repository.get_lock_session(self.id) 87 except Exception, x: 88 print x 89 pass 90 raise RegistryLockError(errstr) 91 self.registry.repository.delete([self.id]) 92 for d in self.registry.changed_ids.itervalues(): 93 d.add(id) 94 finally: 95 self.registry._lock.release()
96
97 - def __repr__(self):
98 return "Incomplete object in '%s', ID %i. Try reload() or remove()." % (self.registry.name,self.id)
99
100 -class Registry(object):
101 """Ganga Registry 102 Base class providing a dict-like locked and lazy-loading interface to a Ganga repository 103 """ 104
105 - def __init__(self, name, doc, dirty_flush_counter=10, update_index_time = 30):
106 """Registry constructor, giving public name and documentation""" 107 self.name = name 108 self.doc = doc 109 self._started = False 110 self.dirty_flush_counter = dirty_flush_counter 111 self.dirty_objs = [] 112 self.dirty_hits = 0 113 self.update_index_time = update_index_time 114 self._update_index_timer = 0 115 self._needs_metadata = False 116 self.metadata = None 117 self._lock = threading.RLock() 118 self.changed_ids = {}
119 120 # Methods intended to be called from ''outside code''
121 - def __getitem__(self,id):
122 """ Returns the Ganga Object with the given id. 123 Raise RegistryKeyError""" 124 try: 125 return self._objects[id] 126 except KeyError: 127 if id in self._incomplete_objects: 128 return IncompleteObject(self, id) 129 raise RegistryKeyError("Could not find object #%s" % id)
130
131 - def __len__(self):
132 """ Returns the current number of root objects """ 133 return len(self._objects)
134
135 - def __contains__(self,id):
136 """ Returns True if the given ID is in the registry """ 137 return id in self._objects
138
139 - def ids(self):
140 """ Returns the list of ids of this registry """ 141 if self._started and time.time() > self._update_index_timer + self.update_index_time: 142 self._lock.acquire() 143 try: 144 changed_ids = self.repository.update_index() 145 for d in self.changed_ids.itervalues(): 146 d.update(changed_ids) 147 finally: 148 self._lock.release() 149 self._update_index_timer = time.time() 150 151 k = self._objects.keys() 152 k.sort() 153 return k
154
155 - def items(self):
156 """ Return the items (ID,obj) in this registry. 157 Recommended access for iteration, since accessing by ID can fail if the ID iterator is old""" 158 if self._started and time.time() > self._update_index_timer + self.update_index_time: 159 self._lock.acquire() 160 try: 161 changed_ids = self.repository.update_index() 162 for d in self.changed_ids.itervalues(): 163 d.update(changed_ids) 164 finally: 165 self._lock.release() 166 self._update_index_timer = time.time() 167 168 its = self._objects.items() 169 its.sort() 170 return its
171
172 - def iteritems(self):
173 """ Return the items (ID,obj) in this registry.""" 174 return self.items()
175
176 - def keys(self):
177 """ Returns the list of ids of this registry """ 178 return self.ids()
179
180 - def values(self):
181 """ Return the objects in this registry, in order of ID. 182 Besides items() this is also recommended for iteration.""" 183 return [it[1] for it in self.items()]
184
185 - def __iter__(self):
186 return iter(self.values())
187
188 - def find(self, obj):
189 """Returns the id of the given object in this registry, or 190 Raise ObjectNotInRegistryError if the Object is not found""" 191 try: 192 assert obj == self._objects[obj._registry_id] 193 return obj._registry_id 194 except AttributeError: 195 raise ObjectNotInRegistryError("Object %s does not seem to be in any registry!" % obj) 196 except AssertionError: 197 raise ObjectNotInRegistryError("Object %s is a duplicated version of the one in this registry!" % obj) 198 except KeyError: 199 raise ObjectNotInRegistryError("Object %s does not seem to be in this registry!" % obj)
200
201 - def clean(self,force=False):
202 """Deletes all elements of the registry, if no other sessions are present. 203 if force == True it removes them regardless of other sessions. 204 Returns True on success, False on failure.""" 205 if not self._started: 206 raise RegistryAccessError("Cannot clean a disconnected repository!") 207 self._lock.acquire() 208 try: 209 if not force: 210 other_sessions = self.repository.get_other_sessions() 211 if len(other_sessions) > 0: 212 logger.error("The following other sessions are active and have blocked the clearing of the repository: \n * %s" % ("\n * ".join(other_sessions))) 213 return False 214 self.repository.reap_locks() 215 self.repository.delete(self._objects.keys()) 216 self.dirty_objs = [] 217 self.dirty_hits = 0 218 self.changed_ids = {} 219 self.repository.clean() 220 finally: 221 self._lock.release()
222 223 # Methods that can be called by derived classes or Ganga-internal classes like Job 224 # if the dirty objects list is modified, the methods must be locked by self._lock 225 # all accesses to the repository must also be locked! 226
227 - def _add(self,obj,force_index=None):
228 """ Add an object to the registry and assigns an ID to it. 229 use force_index to set the index (for example for metadata). This overwrites existing objects! 230 Raises RepositoryError""" 231 if not self._started: 232 raise RegistryAccessError("Cannot add objects to a disconnected repository!") 233 self._lock.acquire() 234 try: 235 if force_index is None: 236 ids = self.repository.add([obj]) 237 else: 238 if len(self.repository.lock([force_index])) == 0: 239 raise RegistryLockError("Could not lock '%s' id #%i for a new object!" % (self.name,force_index)) 240 ids = self.repository.add([obj],[force_index]) # raises exception if len(ids) < 1 241 obj._registry_locked = True 242 self.repository.flush(ids) 243 for d in self.changed_ids.itervalues(): 244 d.update(ids) 245 return ids[0] 246 finally: 247 self._lock.release()
248
249 - def _remove(self,obj,auto_removed=0):
250 """ Private method removing the obj from the registry. This method always called. 251 This method may be overriden in the subclass to trigger additional actions on the removal. 252 'auto_removed' is set to true if this method is called in the context of obj.remove() method to avoid recursion. 253 Only then the removal takes place. In the opposite case the obj.remove() is called first which eventually calls 254 this method again with "auto_removed" set to true. This is done so that obj.remove() is ALWAYS called once independent 255 on the removing context. 256 Raise RepositoryError 257 Raise RegistryAccessError 258 Raise RegistryLockError 259 Raise ObjectNotInRegistryError""" 260 if not self._started: 261 raise RegistryAccessError("Cannot remove objects from a disconnected repository!") 262 if not auto_removed and "remove" in obj.__dict__: 263 obj.remove() 264 else: 265 id = self.find(obj) 266 try: 267 self._write_access(obj) 268 except RegistryKeyError: 269 logger.warning("double delete: Object #%i is not present in registry '%s'!"%(id,self.name)) 270 return 271 logger.debug('deleting the object %d from the registry %s',id,self.name) 272 self._lock.acquire() 273 try: 274 if obj in self.dirty_objs: 275 self.dirty_objs.remove(obj) 276 self.repository.delete([id]) 277 del obj 278 for d in self.changed_ids.itervalues(): 279 d.add(id) 280 finally: 281 self._lock.release()
282 283
284 - def _dirty(self,obj):
285 """ Mark an object as dirty. 286 Trigger automatic flush after specified number of dirty hits 287 Raise RepositoryError 288 Raise RegistryAccessError 289 Raise RegistryLockError""" 290 logger.debug("_dirty(%s)" % id(obj)) 291 self._write_access(obj) 292 self._lock.acquire() 293 try: 294 if not obj in self.dirty_objs: 295 self.dirty_objs.append(obj) 296 self.dirty_hits += 1 297 if self.dirty_hits % self.dirty_flush_counter == 0: 298 self._flush() 299 # HACK for GangaList: there _dirty is called _before_ the object is modified 300 if not obj in self.dirty_objs: 301 self.dirty_objs.append(obj) 302 for d in self.changed_ids.itervalues(): 303 d.add(self.find(obj)) 304 finally: 305 self._lock.release()
306
307 - def _flush(self, objs=[]):
308 """Flush a set of objects to the persistency layer immediately 309 Raise RepositoryError 310 Raise RegistryAccessError 311 Raise RegistryLockError""" 312 #logger.debug("_flush(%s)" % objs) 313 if not self._started: 314 raise RegistryAccessError("Cannot flush to a disconnected repository!") 315 for obj in objs: 316 self._write_access(obj) 317 318 self._lock.acquire() 319 try: 320 for obj in objs: 321 if not obj in self.dirty_objs: 322 self.dirty_objs.append(obj) 323 ids = [] 324 for obj in self.dirty_objs: 325 try: 326 ids.append(self.find(obj)) 327 except ObjectNotInRegistryError, x: 328 logger.error(x.what) 329 logger.debug("repository.flush(%s)" % ids) 330 self.repository.flush(ids) 331 self.dirty_objs = [] 332 finally: 333 self._lock.release()
334
335 - def _read_access(self, obj, sub_obj = None):
336 """Obtain read access on a given object. 337 sub-obj is the object the read access is actually desired (ignored at the moment) 338 Raise RegistryAccessError 339 Raise RegistryKeyError""" 340 #logger.debug("_read_access(%s)" % obj) 341 if not obj._data or "_registry_refresh" in obj.__dict__: 342 if not self._started: 343 raise RegistryAccessError("The object #%i in registry '%s' is not fully loaded and the registry is disconnected! Type 'reactivate()' if you want to reconnect."%(self.find(obj),self.name)) 344 obj.__dict__.pop("_registry_refresh",None) 345 assert not "_registry_refresh" in obj.__dict__ 346 self._lock.acquire() 347 try: 348 id = self.find(obj) 349 try: 350 self.repository.load([id]) 351 except KeyError: 352 raise RegistryKeyError("The object #%i in registry '%s' was deleted!" % (id,self.name)) 353 except InaccessibleObjectError, x: 354 raise RegistryKeyError("The object #%i in registry '%s' could not be accessed - %s!" % (id,self.name,str(x))) 355 for d in self.changed_ids.itervalues(): 356 d.add(id) 357 finally: 358 self._lock.release()
359 360 361
362 - def _write_access(self, obj):
363 """Obtain write access on a given object. 364 Raise RepositoryError 365 Raise RegistryAccessError 366 Raise RegistryLockError 367 Raise ObjectNotInRegistryError""" 368 #logger.debug("_write_access(%s)" % obj) 369 if not self._started: 370 raise RegistryAccessError("Cannot get write access to a disconnected repository!") 371 if not obj._registry_locked: 372 self._lock.acquire() 373 try: 374 id = self.find(obj) 375 try: 376 if len(self.repository.lock([self.find(obj)])) == 0: 377 errstr = "Could not lock '%s' object #%i!" % (self.name,self.find(obj)) 378 try: 379 errstr += " Object is locked by session '%s' " % self.repository.get_lock_session(self.find(obj)) 380 except Exception, x: 381 print x 382 pass 383 raise RegistryLockError(errstr) 384 finally: # try to load even if lock fails 385 try: 386 obj.__dict__.pop("_registry_refresh",None) 387 self.repository.load([id]) 388 except KeyError: 389 raise RegistryKeyError("The object #%i in registry '%s' was deleted!" % (id,self.name)) 390 except InaccessibleObjectError, x: 391 raise RegistryKeyError("The object #%i in registry '%s' could not be accessed - %s!" % (id,self.name,str(x))) 392 for d in self.changed_ids.itervalues(): 393 d.add(id) 394 obj._registry_locked = True 395 finally: 396 self._lock.release() 397 return True
398
399 - def _release_lock(self, obj):
400 """Release the lock on a given object. 401 Raise RepositoryError 402 Raise RegistryAccessError 403 Raise ObjectNotInRegistryError""" 404 if not self._started: 405 raise RegistryAccessError("Cannot manipulate locks of a disconnected repository!") 406 logger.debug("_release_lock(%s)" % id(obj)) 407 self._lock.acquire() 408 try: 409 if obj._registry_locked: 410 oid = self.find(obj) 411 if obj in self.dirty_objs: 412 self.repository.flush([oid]) 413 self.dirty_objs.remove(obj) 414 obj._registry_locked = False 415 self.repository.unlock([oid]) 416 finally: 417 self._lock.release()
418
419 - def pollChangedJobs(self,name):
420 """Returns a list of job ids that changed since the last call of this function. 421 On first invocation returns a list of all ids. 422 "name" should be a unique identifier of the user of this information.""" 423 424 self._lock.acquire() 425 try: 426 if self._started and time.time() > self._update_index_timer + self.update_index_time: 427 changed_ids = self.repository.update_index() 428 for d in self.changed_ids.itervalues(): 429 d.update(changed_ids) 430 self._update_index_timer = time.time() 431 res = self.changed_ids.get(name,Set(self.ids())) 432 self.changed_ids[name] = Set() 433 return res 434 finally: 435 self._lock.release()
436
437 - def getIndexCache(self,obj):
438 """Returns a dictionary to be put into obj._index_cache 439 This can and should be overwritten by derived Registries to provide more index values.""" 440 return {}
441
442 - def startup(self):
443 """Connect the repository to the registry. Called from Repository_runtime.py""" 444 self._lock.acquire() 445 try: 446 t0 = time.time() 447 self.repository = makeRepository(self) 448 self._objects = self.repository.objects 449 self._incomplete_objects = self.repository.incomplete_objects 450 451 if self._needs_metadata: 452 if self.metadata is None: 453 self.metadata = Registry(self.name+".metadata", "Metadata repository for %s"%self.name, dirty_flush_counter=self.dirty_flush_counter, update_index_time = self.update_index_time) 454 self.metadata.type = self.type 455 self.metadata.location = self.location 456 self.metadata._parent = self 457 self.metadata.startup() 458 459 self.repository.startup() 460 # All Ids could have changed 461 self.changed_ids = {} 462 t1 = time.time() 463 logger.info("Registry '%s' [%s] startup time: %s sec" % (self.name, self.type, t1-t0)) 464 self._started = True 465 finally: 466 self._lock.release()
467
468 - def shutdown(self):
469 """Flush and disconnect the repository. Called from Repository_runtime.py """ 470 self._lock.acquire() 471 try: 472 try: 473 if not self.metadata is None: 474 self.metadata.shutdown() 475 except Exception, x: 476 logger.error("Exception on shutting down metadata repository '%s' registry: %s", self.name, x) 477 try: 478 self._flush() 479 except Exception, x: 480 logger.error("Exception on flushing '%s' registry: %s", self.name, x) 481 self._started = False 482 for obj in self._objects.values(): 483 obj._registry_locked = False # locks are not guaranteed to survive repository shutdown 484 self.repository.shutdown() 485 finally: 486 self._lock.release()
487
488 - def info(self,full=False):
489 """Returns an informative string onFlush and disconnect the repository. Called from Repository_runtime.py """ 490 self._lock.acquire() 491 try: 492 s = "registry '%s': %i objects" % (self.name, len(self._objects)) 493 if full: 494 other_sessions = self.repository.get_other_sessions() 495 if len(other_sessions) > 0: 496 s += ", %i other concurrent sessions:\n * %s" % (len(other_sessions), "\n * ".join(other_sessions)) 497 return s 498 finally: 499 self._lock.release()
500
501 - def print_other_sessions(self):
502 other_sessions = self.repository.get_other_sessions() 503 if len(other_sessions) > 0: 504 logger.warning("%i other concurrent sessions:\n * %s" % (len(other_sessions), "\n * ".join(other_sessions)))
505