1 import Ganga.Utility.logging
2 logger = Ganga.Utility.logging.getLogger()
3
4 from Ganga.Core import GangaException
5 from GangaRepository import InaccessibleObjectError
6
7 import time, threading
8
9 import sys
10
11 if sys.hexversion >= 0x020600F0:
12 Set = set
13 else:
14 from sets import Set
15
21 return "RegistryError: %s"%self.what
22
24 """ This error is raised if the request is valid in principle,
25 but the Registry cannot be accessed at the moment."""
27 return "RegistryAccessError: %s"%self.what
28
30 """ This error is raised if the request is valid in principle,
31 but the object is locked by another Ganga session"""
33 return "RegistryLockError: %s"%self.what
34
36 """ This error is raised if an object has been associated to this registry,
37 but is not actually in the registry. This most probably indicates an internal Ganga error."""
39 return "ObjectNotInRegistryError: %s"%self.what
40
42 """ This error is raised if the given id is not found in the registry """
44 return "RegistryKeyError: %s"%self.what
45
47 """ This error is raised if the given id is not found in the registry """
49 return "RegistryIndexError: %s"%self.what
50
63
65 """ This class represents an object that could not be loaded on startup"""
67 self.registry = registry
68 self.id = id
69
71 self.registry._lock.acquire()
72 try:
73 self.registry.repository.load([self.id])
74 print "Successfully reloaded '%s' object #%i!" % (self.registry.name,self.id)
75 for d in self.registry.changed_ids.itervalues():
76 d.add(id)
77 finally:
78 self.registry._lock.release()
79
81 self.registry._lock.acquire()
82 try:
83 if len(self.registry.repository.lock([self.id])) == 0:
84 errstr = "Could not lock '%s' object #%i!" % (self.registry.name,self.id)
85 try:
86 errstr += " Object is locked by session '%s' " % self.registry.repository.get_lock_session(self.id)
87 except Exception, x:
88 print x
89 pass
90 raise RegistryLockError(errstr)
91 self.registry.repository.delete([self.id])
92 for d in self.registry.changed_ids.itervalues():
93 d.add(id)
94 finally:
95 self.registry._lock.release()
96
98 return "Incomplete object in '%s', ID %i. Try reload() or remove()." % (self.registry.name,self.id)
99
101 """Ganga Registry
102 Base class providing a dict-like locked and lazy-loading interface to a Ganga repository
103 """
104
105 - def __init__(self, name, doc, dirty_flush_counter=10, update_index_time = 30):
106 """Registry constructor, giving public name and documentation"""
107 self.name = name
108 self.doc = doc
109 self._started = False
110 self.dirty_flush_counter = dirty_flush_counter
111 self.dirty_objs = []
112 self.dirty_hits = 0
113 self.update_index_time = update_index_time
114 self._update_index_timer = 0
115 self._needs_metadata = False
116 self.metadata = None
117 self._lock = threading.RLock()
118 self.changed_ids = {}
119
120
122 """ Returns the Ganga Object with the given id.
123 Raise RegistryKeyError"""
124 try:
125 return self._objects[id]
126 except KeyError:
127 if id in self._incomplete_objects:
128 return IncompleteObject(self, id)
129 raise RegistryKeyError("Could not find object #%s" % id)
130
132 """ Returns the current number of root objects """
133 return len(self._objects)
134
136 """ Returns True if the given ID is in the registry """
137 return id in self._objects
138
140 """ Returns the list of ids of this registry """
141 if self._started and time.time() > self._update_index_timer + self.update_index_time:
142 self._lock.acquire()
143 try:
144 changed_ids = self.repository.update_index()
145 for d in self.changed_ids.itervalues():
146 d.update(changed_ids)
147 finally:
148 self._lock.release()
149 self._update_index_timer = time.time()
150
151 k = self._objects.keys()
152 k.sort()
153 return k
154
156 """ Return the items (ID,obj) in this registry.
157 Recommended access for iteration, since accessing by ID can fail if the ID iterator is old"""
158 if self._started and time.time() > self._update_index_timer + self.update_index_time:
159 self._lock.acquire()
160 try:
161 changed_ids = self.repository.update_index()
162 for d in self.changed_ids.itervalues():
163 d.update(changed_ids)
164 finally:
165 self._lock.release()
166 self._update_index_timer = time.time()
167
168 its = self._objects.items()
169 its.sort()
170 return its
171
173 """ Return the items (ID,obj) in this registry."""
174 return self.items()
175
177 """ Returns the list of ids of this registry """
178 return self.ids()
179
181 """ Return the objects in this registry, in order of ID.
182 Besides items() this is also recommended for iteration."""
183 return [it[1] for it in self.items()]
184
186 return iter(self.values())
187
188 - def find(self, obj):
189 """Returns the id of the given object in this registry, or
190 Raise ObjectNotInRegistryError if the Object is not found"""
191 try:
192 assert obj == self._objects[obj._registry_id]
193 return obj._registry_id
194 except AttributeError:
195 raise ObjectNotInRegistryError("Object %s does not seem to be in any registry!" % obj)
196 except AssertionError:
197 raise ObjectNotInRegistryError("Object %s is a duplicated version of the one in this registry!" % obj)
198 except KeyError:
199 raise ObjectNotInRegistryError("Object %s does not seem to be in this registry!" % obj)
200
201 - def clean(self,force=False):
202 """Deletes all elements of the registry, if no other sessions are present.
203 if force == True it removes them regardless of other sessions.
204 Returns True on success, False on failure."""
205 if not self._started:
206 raise RegistryAccessError("Cannot clean a disconnected repository!")
207 self._lock.acquire()
208 try:
209 if not force:
210 other_sessions = self.repository.get_other_sessions()
211 if len(other_sessions) > 0:
212 logger.error("The following other sessions are active and have blocked the clearing of the repository: \n * %s" % ("\n * ".join(other_sessions)))
213 return False
214 self.repository.reap_locks()
215 self.repository.delete(self._objects.keys())
216 self.dirty_objs = []
217 self.dirty_hits = 0
218 self.changed_ids = {}
219 self.repository.clean()
220 finally:
221 self._lock.release()
222
223
224
225
226
227 - def _add(self,obj,force_index=None):
228 """ Add an object to the registry and assigns an ID to it.
229 use force_index to set the index (for example for metadata). This overwrites existing objects!
230 Raises RepositoryError"""
231 if not self._started:
232 raise RegistryAccessError("Cannot add objects to a disconnected repository!")
233 self._lock.acquire()
234 try:
235 if force_index is None:
236 ids = self.repository.add([obj])
237 else:
238 if len(self.repository.lock([force_index])) == 0:
239 raise RegistryLockError("Could not lock '%s' id #%i for a new object!" % (self.name,force_index))
240 ids = self.repository.add([obj],[force_index])
241 obj._registry_locked = True
242 self.repository.flush(ids)
243 for d in self.changed_ids.itervalues():
244 d.update(ids)
245 return ids[0]
246 finally:
247 self._lock.release()
248
249 - def _remove(self,obj,auto_removed=0):
250 """ Private method removing the obj from the registry. This method always called.
251 This method may be overriden in the subclass to trigger additional actions on the removal.
252 'auto_removed' is set to true if this method is called in the context of obj.remove() method to avoid recursion.
253 Only then the removal takes place. In the opposite case the obj.remove() is called first which eventually calls
254 this method again with "auto_removed" set to true. This is done so that obj.remove() is ALWAYS called once independent
255 on the removing context.
256 Raise RepositoryError
257 Raise RegistryAccessError
258 Raise RegistryLockError
259 Raise ObjectNotInRegistryError"""
260 if not self._started:
261 raise RegistryAccessError("Cannot remove objects from a disconnected repository!")
262 if not auto_removed and "remove" in obj.__dict__:
263 obj.remove()
264 else:
265 id = self.find(obj)
266 try:
267 self._write_access(obj)
268 except RegistryKeyError:
269 logger.warning("double delete: Object #%i is not present in registry '%s'!"%(id,self.name))
270 return
271 logger.debug('deleting the object %d from the registry %s',id,self.name)
272 self._lock.acquire()
273 try:
274 if obj in self.dirty_objs:
275 self.dirty_objs.remove(obj)
276 self.repository.delete([id])
277 del obj
278 for d in self.changed_ids.itervalues():
279 d.add(id)
280 finally:
281 self._lock.release()
282
283
285 """ Mark an object as dirty.
286 Trigger automatic flush after specified number of dirty hits
287 Raise RepositoryError
288 Raise RegistryAccessError
289 Raise RegistryLockError"""
290 logger.debug("_dirty(%s)" % id(obj))
291 self._write_access(obj)
292 self._lock.acquire()
293 try:
294 if not obj in self.dirty_objs:
295 self.dirty_objs.append(obj)
296 self.dirty_hits += 1
297 if self.dirty_hits % self.dirty_flush_counter == 0:
298 self._flush()
299
300 if not obj in self.dirty_objs:
301 self.dirty_objs.append(obj)
302 for d in self.changed_ids.itervalues():
303 d.add(self.find(obj))
304 finally:
305 self._lock.release()
306
308 """Flush a set of objects to the persistency layer immediately
309 Raise RepositoryError
310 Raise RegistryAccessError
311 Raise RegistryLockError"""
312
313 if not self._started:
314 raise RegistryAccessError("Cannot flush to a disconnected repository!")
315 for obj in objs:
316 self._write_access(obj)
317
318 self._lock.acquire()
319 try:
320 for obj in objs:
321 if not obj in self.dirty_objs:
322 self.dirty_objs.append(obj)
323 ids = []
324 for obj in self.dirty_objs:
325 try:
326 ids.append(self.find(obj))
327 except ObjectNotInRegistryError, x:
328 logger.error(x.what)
329 logger.debug("repository.flush(%s)" % ids)
330 self.repository.flush(ids)
331 self.dirty_objs = []
332 finally:
333 self._lock.release()
334
336 """Obtain read access on a given object.
337 sub-obj is the object the read access is actually desired (ignored at the moment)
338 Raise RegistryAccessError
339 Raise RegistryKeyError"""
340
341 if not obj._data or "_registry_refresh" in obj.__dict__:
342 if not self._started:
343 raise RegistryAccessError("The object #%i in registry '%s' is not fully loaded and the registry is disconnected! Type 'reactivate()' if you want to reconnect."%(self.find(obj),self.name))
344 obj.__dict__.pop("_registry_refresh",None)
345 assert not "_registry_refresh" in obj.__dict__
346 self._lock.acquire()
347 try:
348 id = self.find(obj)
349 try:
350 self.repository.load([id])
351 except KeyError:
352 raise RegistryKeyError("The object #%i in registry '%s' was deleted!" % (id,self.name))
353 except InaccessibleObjectError, x:
354 raise RegistryKeyError("The object #%i in registry '%s' could not be accessed - %s!" % (id,self.name,str(x)))
355 for d in self.changed_ids.itervalues():
356 d.add(id)
357 finally:
358 self._lock.release()
359
360
361
363 """Obtain write access on a given object.
364 Raise RepositoryError
365 Raise RegistryAccessError
366 Raise RegistryLockError
367 Raise ObjectNotInRegistryError"""
368
369 if not self._started:
370 raise RegistryAccessError("Cannot get write access to a disconnected repository!")
371 if not obj._registry_locked:
372 self._lock.acquire()
373 try:
374 id = self.find(obj)
375 try:
376 if len(self.repository.lock([self.find(obj)])) == 0:
377 errstr = "Could not lock '%s' object #%i!" % (self.name,self.find(obj))
378 try:
379 errstr += " Object is locked by session '%s' " % self.repository.get_lock_session(self.find(obj))
380 except Exception, x:
381 print x
382 pass
383 raise RegistryLockError(errstr)
384 finally:
385 try:
386 obj.__dict__.pop("_registry_refresh",None)
387 self.repository.load([id])
388 except KeyError:
389 raise RegistryKeyError("The object #%i in registry '%s' was deleted!" % (id,self.name))
390 except InaccessibleObjectError, x:
391 raise RegistryKeyError("The object #%i in registry '%s' could not be accessed - %s!" % (id,self.name,str(x)))
392 for d in self.changed_ids.itervalues():
393 d.add(id)
394 obj._registry_locked = True
395 finally:
396 self._lock.release()
397 return True
398
400 """Release the lock on a given object.
401 Raise RepositoryError
402 Raise RegistryAccessError
403 Raise ObjectNotInRegistryError"""
404 if not self._started:
405 raise RegistryAccessError("Cannot manipulate locks of a disconnected repository!")
406 logger.debug("_release_lock(%s)" % id(obj))
407 self._lock.acquire()
408 try:
409 if obj._registry_locked:
410 oid = self.find(obj)
411 if obj in self.dirty_objs:
412 self.repository.flush([oid])
413 self.dirty_objs.remove(obj)
414 obj._registry_locked = False
415 self.repository.unlock([oid])
416 finally:
417 self._lock.release()
418
420 """Returns a list of job ids that changed since the last call of this function.
421 On first invocation returns a list of all ids.
422 "name" should be a unique identifier of the user of this information."""
423
424 self._lock.acquire()
425 try:
426 if self._started and time.time() > self._update_index_timer + self.update_index_time:
427 changed_ids = self.repository.update_index()
428 for d in self.changed_ids.itervalues():
429 d.update(changed_ids)
430 self._update_index_timer = time.time()
431 res = self.changed_ids.get(name,Set(self.ids()))
432 self.changed_ids[name] = Set()
433 return res
434 finally:
435 self._lock.release()
436
438 """Returns a dictionary to be put into obj._index_cache
439 This can and should be overwritten by derived Registries to provide more index values."""
440 return {}
441
443 """Connect the repository to the registry. Called from Repository_runtime.py"""
444 self._lock.acquire()
445 try:
446 t0 = time.time()
447 self.repository = makeRepository(self)
448 self._objects = self.repository.objects
449 self._incomplete_objects = self.repository.incomplete_objects
450
451 if self._needs_metadata:
452 if self.metadata is None:
453 self.metadata = Registry(self.name+".metadata", "Metadata repository for %s"%self.name, dirty_flush_counter=self.dirty_flush_counter, update_index_time = self.update_index_time)
454 self.metadata.type = self.type
455 self.metadata.location = self.location
456 self.metadata._parent = self
457 self.metadata.startup()
458
459 self.repository.startup()
460
461 self.changed_ids = {}
462 t1 = time.time()
463 logger.info("Registry '%s' [%s] startup time: %s sec" % (self.name, self.type, t1-t0))
464 self._started = True
465 finally:
466 self._lock.release()
467
469 """Flush and disconnect the repository. Called from Repository_runtime.py """
470 self._lock.acquire()
471 try:
472 try:
473 if not self.metadata is None:
474 self.metadata.shutdown()
475 except Exception, x:
476 logger.error("Exception on shutting down metadata repository '%s' registry: %s", self.name, x)
477 try:
478 self._flush()
479 except Exception, x:
480 logger.error("Exception on flushing '%s' registry: %s", self.name, x)
481 self._started = False
482 for obj in self._objects.values():
483 obj._registry_locked = False
484 self.repository.shutdown()
485 finally:
486 self._lock.release()
487
488 - def info(self,full=False):
489 """Returns an informative string onFlush and disconnect the repository. Called from Repository_runtime.py """
490 self._lock.acquire()
491 try:
492 s = "registry '%s': %i objects" % (self.name, len(self._objects))
493 if full:
494 other_sessions = self.repository.get_other_sessions()
495 if len(other_sessions) > 0:
496 s += ", %i other concurrent sessions:\n * %s" % (len(other_sessions), "\n * ".join(other_sessions))
497 return s
498 finally:
499 self._lock.release()
500
502 other_sessions = self.repository.get_other_sessions()
503 if len(other_sessions) > 0:
504 logger.warning("%i other concurrent sessions:\n * %s" % (len(other_sessions), "\n * ".join(other_sessions)))
505