Package Ganga :: Package Utility :: Module Caching
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Utility.Caching

  1  from os import stat 
  2  import time 
  3  import new 
  4  from Ganga.Utility.logging import getLogger 
  5  from Ganga.Utility.Config import getConfig, makeConfig, ConfigError 
  6   
  7  try: 
  8      from threading import Lock 
  9  except ImportError: 
 10      from dummy_threading import Lock 
 11   
 12  NOT_INITIALIZED = object() 
 13   
 14  log = getLogger() 
 15  config = makeConfig('Caching','Caching for DQ2 dataset') 
 16  config.addOption('CacheLifeTime', 150, 'Cache refresh time in seconds') 
 17  config.addOption('CacheMaxEntry', 10, 'For CacheMaxEntry == 0,  the cache is unbounded, otherwise  the Least Recently Used (LRU) entry is discarded.') 
 18   
 19   
20 -class Entry(object):
21 """ A cache entry, mostly an internal object. """
22 - def __init__(self, key):
23 object.__init__(self) 24 self._key=key 25 self._timestamp = time.time() 26 self._value=NOT_INITIALIZED 27 self._lock=Lock()
28
29 -class Cache(object):
30 """ An abstract, multi-threaded cache object. """ 31 32 cacheDict = {} 33
34 - def __init__(self, dset, max_size=0):
35 """ Builds a cache with a limit of max_size entries. 36 If this limit is exceeded, the Least Recently Used entry is discarded. 37 if max_size==0, the cache is unbounded (no LRU rule is applied). 38 """ 39 object.__init__(self) 40 self._maxsize=max_size 41 self._dict=self.cacheDict 42 self._dset=dset 43 self._lock=Lock() 44 45 # Header of the access list 46 if self._maxsize: 47 self._head=Entry(None) 48 self._head._previous=self._head 49 self._head._next=self._head
50
51 - def __setitem__(self, name, value):
52 """ Populates the cache with a given name and value. """ 53 key = self.key(name) 54 55 entry = self._get_entry(key) 56 57 entry._lock.acquire() 58 try: 59 self._pack(entry,value) 60 self.commit() 61 finally: 62 entry._lock.release()
63
64 - def __getitem__(self, name):
65 """ Gets a value from the cache, builds it if required. 66 """ 67 return self._checkitem(name)[2]
68
69 - def __delitem__(self, name):
70 self._lock.acquire() 71 try: 72 key = self.key(name) 73 del self._dict[key] 74 finally: 75 self._lock.release()
76
77 - def _get_entry(self,key):
78 self._lock.acquire() 79 try: 80 key = str(key) 81 entry = self._dict.get(key) 82 if not entry: 83 entry = Entry(key) 84 self._dict[key]=entry 85 if self._maxsize: 86 entry._next = entry._previous = None 87 self._access(entry) 88 self._checklru() 89 elif self._maxsize: 90 self._access(entry) 91 return entry 92 finally: 93 self._lock.release()
94
95 - def _checkitem(self, name):
96 """ Gets a value from the cache, builds it if required. 97 Returns a tuple is_new, key, value, entry. 98 If is_new is True, the result had to be rebuilt. 99 """ 100 key = self.key(name) 101 entry = self._get_entry(key) 102 103 entry._lock.acquire() 104 try: 105 value = self._unpack(entry) 106 is_new = False 107 if value is NOT_INITIALIZED: 108 opened = self.check(key[1:], name, entry) 109 value = self.build(key[1:], name, opened, entry) 110 is_new = True 111 self._pack(entry, value) 112 self.commit() 113 else: 114 opened = self.check(key[1:], name, entry) 115 if opened is not None: 116 value = self.build(key[1:], name, opened, entry) 117 is_new = True 118 self._pack(entry, value) 119 self.commit() 120 log.debug("Cache is being refreshed for dataset %s" % str(key)) 121 122 log.debug("Value is reading from cache for its key %s" % str(key)) 123 124 return is_new, key[1:], value, entry 125 finally: 126 entry._lock.release()
127
128 - def mru(self):
129 """ Returns the Most Recently Used key """ 130 if self._maxsize: 131 self._lock.acquire() 132 try: 133 return self._head._previous._key 134 finally: 135 self._lock.release() 136 else: 137 return None
138
139 - def lru(self):
140 """ Returns the Least Recently Used key """ 141 if self._maxsize: 142 self._lock.acquire() 143 try: 144 return self._head._next._key 145 finally: 146 self._lock.release() 147 else: 148 return None
149
150 - def key(self, name):
151 """ Override this method to extract a key from the name passed to the [] operator """ 152 dsetN = self._dset 153 154 if (dsetN.__class__.__name__ == 'GangaList'): 155 dataset = tuple(dsetN) 156 else: 157 dataset = (dsetN,) 158 159 parg = dataset + name 160 return parg
161
162 - def commit(self):
163 """ Override this method if you want to do something each time the underlying dictionary is modified (e.g. make it persistent). """ 164 pass
165
166 - def clear(self):
167 """ Clears the cache """ 168 self._lock.acquire() 169 try: 170 self._dict.clear() 171 if self._maxsize: 172 self._head._next=self._head 173 self._head._previous=self._head 174 finally: 175 self._lock.release()
176
177 - def check(self, key, name, entry):
178 """ Override this method to check whether the entry with the given name is stale. Return None if it is fresh 179 or an opened resource if it is stale. The object returned will be passed to the 'build' method as the 'opened' parameter. 180 Use the 'entry' parameter to store meta-data if required. Don't worry about multiple threads accessing the same name, 181 as this method is properly isolated. 182 """ 183 return None
184
185 - def build(self, key, name, opened, entry):
186 """ Build the cached value with the given name from the given opened resource. Use entry to obtain or store meta-data if needed. 187 Don't worry about multiple threads accessing the same name, as this method is properly isolated. 188 """ 189 raise NotImplementedError()
190
191 - def _access(self, entry):
192 " Internal use only, must be invoked within a cache lock. Updates the access list. """ 193 if entry._next is not self._head: 194 if entry._previous is not None: 195 # remove the entry from the access list 196 entry._previous._next=entry._next 197 entry._next._previous=entry._previous 198 # insert the entry at the end of the access list 199 entry._previous=self._head._previous 200 entry._previous._next=entry 201 entry._next=self._head 202 entry._next._previous=entry 203 if self._head._next is self._head: 204 self._head._next=entry
205
206 - def _checklru(self):
207 " Internal use only, must be invoked within a cache lock. Removes the LRU entry if needed. """ 208 if len(self._dict)>self._maxsize: 209 lru=self._head._next 210 lru._previous._next=lru._next 211 lru._next._previous=lru._previous 212 del self._dict[lru._key] 213 log.debug("Old entry %s in cache is being deleted for creating space for new one" %str(lru._key))
214
215 - def _pack(self, entry, value):
216 """ Store the value in the entry. """ 217 entry._value=value
218
219 - def _unpack(self, entry):
220 """ Recover the value from the entry, returns NOT_INITIALIZED if it is not OK. """ 221 return entry._value
222
223 -class FunctionCache(Cache):
224 - def __init__(self, function, dset, max_size=config['CacheMaxEntry']):
225 Cache.__init__(self, dset, max_size) 226 self.function=function
227
228 - def __call__(self, *args, **kw):
229 if kw: 230 # a dict is not hashable so we build a tuple of (key, value) pairs 231 kw = tuple(kw.iteritems()) 232 return self[args, kw] 233 else: 234 return self[args, ()]
235
236 - def check(self, key, name, entry):
237 238 if entry._value is NOT_INITIALIZED: 239 return None 240 else: 241 timediff = time.time() - entry._timestamp 242 if timediff > config['CacheLifeTime']: 243 entry._timestamp = time.time() 244 return "Replacement of key in cache" 245 else: 246 return None
247
248 - def build(self, key, name, opened, entry):
249 args, kw = key 250 return self.function(*args, **dict(kw))
251 252 253 """ 254 #Function for caching can be used in the following way ... 255 def compute(n): 256 print "Sleeping for ", n, " seconds" 257 time.sleep(n) 258 return "Done ........" 259 260 func = FunctionCache(compute) 261 print func(2) 262 print func(3) 263 print func(2) 264 """ 265