1 from os import stat
2 import time
3 import new
4 from Ganga.Utility.logging import getLogger
5 from Ganga.Utility.Config import getConfig, makeConfig, ConfigError
6
7 try:
8 from threading import Lock
9 except ImportError:
10 from dummy_threading import Lock
11
12 NOT_INITIALIZED = object()
13
14 log = getLogger()
15 config = makeConfig('Caching','Caching for DQ2 dataset')
16 config.addOption('CacheLifeTime', 150, 'Cache refresh time in seconds')
17 config.addOption('CacheMaxEntry', 10, 'For CacheMaxEntry == 0, the cache is unbounded, otherwise the Least Recently Used (LRU) entry is discarded.')
18
19
21 """ A cache entry, mostly an internal object. """
22 - def __init__(self, key):
23 object.__init__(self)
24 self._key=key
25 self._timestamp = time.time()
26 self._value=NOT_INITIALIZED
27 self._lock=Lock()
28
30 """ An abstract, multi-threaded cache object. """
31
32 cacheDict = {}
33
35 """ Builds a cache with a limit of max_size entries.
36 If this limit is exceeded, the Least Recently Used entry is discarded.
37 if max_size==0, the cache is unbounded (no LRU rule is applied).
38 """
39 object.__init__(self)
40 self._maxsize=max_size
41 self._dict=self.cacheDict
42 self._dset=dset
43 self._lock=Lock()
44
45
46 if self._maxsize:
47 self._head=Entry(None)
48 self._head._previous=self._head
49 self._head._next=self._head
50
63
65 """ Gets a value from the cache, builds it if required.
66 """
67 return self._checkitem(name)[2]
68
76
77 - def _get_entry(self,key):
78 self._lock.acquire()
79 try:
80 key = str(key)
81 entry = self._dict.get(key)
82 if not entry:
83 entry = Entry(key)
84 self._dict[key]=entry
85 if self._maxsize:
86 entry._next = entry._previous = None
87 self._access(entry)
88 self._checklru()
89 elif self._maxsize:
90 self._access(entry)
91 return entry
92 finally:
93 self._lock.release()
94
96 """ Gets a value from the cache, builds it if required.
97 Returns a tuple is_new, key, value, entry.
98 If is_new is True, the result had to be rebuilt.
99 """
100 key = self.key(name)
101 entry = self._get_entry(key)
102
103 entry._lock.acquire()
104 try:
105 value = self._unpack(entry)
106 is_new = False
107 if value is NOT_INITIALIZED:
108 opened = self.check(key[1:], name, entry)
109 value = self.build(key[1:], name, opened, entry)
110 is_new = True
111 self._pack(entry, value)
112 self.commit()
113 else:
114 opened = self.check(key[1:], name, entry)
115 if opened is not None:
116 value = self.build(key[1:], name, opened, entry)
117 is_new = True
118 self._pack(entry, value)
119 self.commit()
120 log.debug("Cache is being refreshed for dataset %s" % str(key))
121
122 log.debug("Value is reading from cache for its key %s" % str(key))
123
124 return is_new, key[1:], value, entry
125 finally:
126 entry._lock.release()
127
129 """ Returns the Most Recently Used key """
130 if self._maxsize:
131 self._lock.acquire()
132 try:
133 return self._head._previous._key
134 finally:
135 self._lock.release()
136 else:
137 return None
138
140 """ Returns the Least Recently Used key """
141 if self._maxsize:
142 self._lock.acquire()
143 try:
144 return self._head._next._key
145 finally:
146 self._lock.release()
147 else:
148 return None
149
150 - def key(self, name):
151 """ Override this method to extract a key from the name passed to the [] operator """
152 dsetN = self._dset
153
154 if (dsetN.__class__.__name__ == 'GangaList'):
155 dataset = tuple(dsetN)
156 else:
157 dataset = (dsetN,)
158
159 parg = dataset + name
160 return parg
161
163 """ Override this method if you want to do something each time the underlying dictionary is modified (e.g. make it persistent). """
164 pass
165
167 """ Clears the cache """
168 self._lock.acquire()
169 try:
170 self._dict.clear()
171 if self._maxsize:
172 self._head._next=self._head
173 self._head._previous=self._head
174 finally:
175 self._lock.release()
176
177 - def check(self, key, name, entry):
178 """ Override this method to check whether the entry with the given name is stale. Return None if it is fresh
179 or an opened resource if it is stale. The object returned will be passed to the 'build' method as the 'opened' parameter.
180 Use the 'entry' parameter to store meta-data if required. Don't worry about multiple threads accessing the same name,
181 as this method is properly isolated.
182 """
183 return None
184
185 - def build(self, key, name, opened, entry):
186 """ Build the cached value with the given name from the given opened resource. Use entry to obtain or store meta-data if needed.
187 Don't worry about multiple threads accessing the same name, as this method is properly isolated.
188 """
189 raise NotImplementedError()
190
192 " Internal use only, must be invoked within a cache lock. Updates the access list. """
193 if entry._next is not self._head:
194 if entry._previous is not None:
195
196 entry._previous._next=entry._next
197 entry._next._previous=entry._previous
198
199 entry._previous=self._head._previous
200 entry._previous._next=entry
201 entry._next=self._head
202 entry._next._previous=entry
203 if self._head._next is self._head:
204 self._head._next=entry
205
207 " Internal use only, must be invoked within a cache lock. Removes the LRU entry if needed. """
208 if len(self._dict)>self._maxsize:
209 lru=self._head._next
210 lru._previous._next=lru._next
211 lru._next._previous=lru._previous
212 del self._dict[lru._key]
213 log.debug("Old entry %s in cache is being deleted for creating space for new one" %str(lru._key))
214
215 - def _pack(self, entry, value):
216 """ Store the value in the entry. """
217 entry._value=value
218
220 """ Recover the value from the entry, returns NOT_INITIALIZED if it is not OK. """
221 return entry._value
222
224 - def __init__(self, function, dset, max_size=config['CacheMaxEntry']):
225 Cache.__init__(self, dset, max_size)
226 self.function=function
227
229 if kw:
230
231 kw = tuple(kw.iteritems())
232 return self[args, kw]
233 else:
234 return self[args, ()]
235
236 - def check(self, key, name, entry):
237
238 if entry._value is NOT_INITIALIZED:
239 return None
240 else:
241 timediff = time.time() - entry._timestamp
242 if timediff > config['CacheLifeTime']:
243 entry._timestamp = time.time()
244 return "Replacement of key in cache"
245 else:
246 return None
247
248 - def build(self, key, name, opened, entry):
251
252
253 """
254 #Function for caching can be used in the following way ...
255 def compute(n):
256 print "Sleeping for ", n, " seconds"
257 time.sleep(n)
258 return "Done ........"
259
260 func = FunctionCache(compute)
261 print func(2)
262 print func(3)
263 print func(2)
264 """
265