1
2
3
4
5
6 import os, time, errno, threading, fcntl, random
7
8 import sys
9 if sys.hexversion >= 0x020600F0:
10 Set = set
11 else:
12 from sets import Set
13
14 try:
15 import cPickle as pickle
16 except ImportError:
17 import pickle
18
19 try:
20 from Ganga.Core.GangaThread import GangaThread
21 from Ganga.Core.GangaRepository import RepositoryError
22 import Ganga.Utility.logging
23 logger = Ganga.Utility.logging.getLogger()
24 except ImportError:
25 print "IMPORT ERROR SHOULD NOT OCCUR IN PRODUCTION CODE!!!!!!!!!!!!!!!!!!!!!!"
26 from threading import Thread
33
39
42 logger = Logger()
43
44 session_expiration_timeout = 40
45 session_lock_refresher = None
46
48 - def __init__(self,session_name,sdir,fn, repo):
49 GangaThread.__init__(self, name='SessionLockRefresher', critical=False)
50 self.session_name = session_name
51 self.sdir = sdir
52 self.fn = fn
53 self.repos = [repo]
54
55
57
58 try:
59 while not self.should_stop():
60
61 try:
62 try:
63 oldnow = os.stat(self.fn).st_ctime
64 os.utime(self.fn,None)
65 now = os.stat(self.fn).st_ctime
66
67
68
69 except OSError, x:
70 if x.errno != errno.ENOENT:
71 logger.debug("Session file timestamp could not be updated! Locks could be lost!")
72 else:
73 raise RepositoryError(self.repos[0], "Own session file not found! Possibly deleted by another ganga session.\n\
74 Possible reasons could be that this computer has a very high load, or that the system clocks on computers running Ganga are not synchronized.\n\
75 On computers with very high load and on network filesystems, try to avoid running concurrent ganga sessions for long.")
76
77
78 try:
79 from Ganga.Core import monitoring_component
80 if not monitoring_component is None and monitoring_component.enabled:
81
82 ls_sdir = os.listdir(self.sdir)
83 session_files = [f for f in ls_sdir if f.endswith(".session")]
84 lock_files = [f for f in ls_sdir if f.endswith(".locks")]
85 for sf in session_files:
86 if os.path.join(self.sdir,sf) == self.fn:
87 continue
88 mtm = os.stat(os.path.join(self.sdir,sf)).st_ctime
89
90 if now - mtm > session_expiration_timeout:
91 logger.warning("Removing session %s because of inactivity (no update since %s seconds)" % (sf, now - mtm))
92 os.unlink(os.path.join(self.sdir,sf))
93 session_files.remove(sf)
94
95
96
97 for f in lock_files:
98 if not f.endswith(".session"):
99 asf = f.split(".session")[0] + ".session"
100 if not asf in session_files:
101
102 os.unlink(os.path.join(self.sdir,f))
103 except OSError, x:
104
105 logger.info("Unimportant OSError in loop: %s" % x)
106 except RepositoryError:
107 break
108 except Exception, x:
109 logger.warning("Internal exception in session lock thread: %s %s" % (x.__class__.__name__, x))
110 time.sleep(1+random.random())
111 finally:
112
113 try:
114 os.unlink(self.fn)
115 except OSError, x:
116 logger.debug("Session file was deleted already or removal failed: %s" % (x))
117 self.unregister()
118 global session_lock_refresher
119 session_lock_refresher = None
120
123
125 """ Class with thread that keeps a global lock file that synchronizes
126 ID and counter access across Ganga sessions.
127 DEVELOPER WARNING: On NFS, files that are not locked with lockf (NOT flock) will
128 NOT be synchronized across clients, even if a global lock file is used!
129 Interface:
130 * startup() starts the session, automatically called on init
131 * shutdown() stops the thread, FREES ALL LOCKS
132 * make_new_ids(n) returns n new (locked) ids
133 * lock_ids(ids) returns the ids that were successfully locked
134 * release_ids(ids) returns the ids that were successfully released (now: all)
135 All access to an instance of this class MUST be synchronized!
136 Should ONLY raise RepositoryError (if possibly-corrupting errors are found)
137 """
139 """Make sure the given directory exists"""
140 try:
141 os.makedirs(dn)
142 except OSError, x:
143 if x.errno != errno.EEXIST:
144 raise RepositoryError(self.repo, "OSError on directory create: %s" % x)
145
146 - def __init__(self, repo, root, name, minimum_count=0):
170
171
172
174
175 self.mkdir(os.path.join(self.realpath,"sessions"))
176 self.mkdir(os.path.join(self.realpath,self.name))
177
178
179 self.global_lock_setup()
180 self.global_lock_acquire()
181 try:
182
183 if not os.path.exists(self.cntfn):
184 try:
185 fd = os.open(self.cntfn, os.O_EXCL | os.O_CREAT | os.O_WRONLY)
186 os.write(fd,"0")
187 os.close(fd)
188 except OSError, x:
189 if x.errno != errno.EEXIST:
190 raise RepositoryError(self.repo, "OSError on count file create: %s" % x)
191 try:
192 self.count = max(self.count,self.cnt_read())
193 except ValueError:
194 logger.error("Corrupt count file '%s'! Trying to recover..." % (self.cntfn))
195 except OSError, x:
196 raise RepositoryError(self.repo, "OSError on count file '%s' access!" % (self.cntfn))
197 self.cnt_write()
198
199 try:
200 fd = os.open(self.fn, os.O_EXCL | os.O_CREAT | os.O_WRONLY)
201 os.write(fd,pickle.dumps(Set()))
202 os.close(fd)
203 except OSError, x:
204 raise RepositoryError(self.repo, "Error on session file '%s' creation: %s" % (self.fn,x))
205 global session_lock_refresher
206 if session_lock_refresher is None:
207 try:
208 os.close(os.open(self.gfn, os.O_EXCL | os.O_CREAT | os.O_WRONLY))
209 except OSError, x:
210 raise RepositoryError(self.repo, "Error on session file '%s' creation: %s" % (self.gfn,x))
211 session_lock_refresher = SessionLockRefresher(self.session_name, self.sdir, self.gfn, self.repo)
212 session_lock_refresher.start()
213 else:
214 session_lock_refresher.addRepo(self.repo)
215 self.session_write()
216 finally:
217 self.global_lock_release()
218
219
221 """Shutdown the thread and locking system (on ganga shutdown or repo error)"""
222 self.locked = Set()
223 try:
224 os.unlink(self.fn)
225 except OSError, x:
226 logger.debug("Session file '%s' was deleted already or removal failed: %s" % (self.fn,x))
227
228
230 self.lockfn = os.path.join(self.sdir,"global_lock")
231 try:
232 file(self.lockfn,"w").close()
233 self.lockfd = os.open(self.lockfn,os.O_RDWR)
234 except IOError, x:
235 raise RepositoryError(self.repo, "Could not create lock file '%s': %s" % (self.lockfn, x))
236 except OSError, x:
237 raise RepositoryError(self.repo, "Could not open lock file '%s': %s" % (self.lockfn, x))
238
240 try:
241 fcntl.lockf(self.lockfd,fcntl.LOCK_EX)
242 except IOError, x:
243 raise RepositoryError(self.repo, "IOError on lock ('%s'): %s" % (self.lockfn, x))
244
246 try:
247 fcntl.lockf(self.lockfd,fcntl.LOCK_UN)
248 except IOError, x:
249 raise RepositoryError(self.repo, "IOError on unlock ('%s'): %s" % (self.lockfn, x))
250
251
253 """ Reads a session file and returns a set of IDs locked by that session.
254 The global lock MUST be held for this function to work, although on NFS additional
255 locking is done
256 Raises RepositoryError if severe access problems occur (corruption otherwise!) """
257 try:
258 fd = os.open(fn, os.O_RDONLY)
259 try:
260 if not self.afs:
261 fcntl.lockf(fd,fcntl.LOCK_SH)
262 try:
263 return pickle.loads(os.read(fd,104857600))
264 except Exception, x:
265 logger.warning("corrupt or inaccessible session file '%s' - ignoring it (Exception %s %s)."% (fn, x.__class__.__name__, x))
266 finally:
267 if not self.afs:
268 fcntl.lockf(fd,fcntl.LOCK_UN)
269 os.close(fd)
270 except OSError, x:
271 if x.errno != errno.ENOENT:
272 raise RepositoryError(self.repo, "Error on session file access '%s': %s" % (fn,x))
273 return Set()
274
276 """ Writes the locked set to the session file.
277 The global lock MUST be held for this function to work, although on NFS additional
278 locking is done
279 Raises RepositoryError if session file is inaccessible """
280 try:
281
282 fd = os.open(self.fn,os.O_WRONLY)
283 if not self.afs:
284 fcntl.lockf(fd,fcntl.LOCK_EX)
285 os.write(fd,pickle.dumps(self.locked))
286 if not self.afs:
287 fcntl.lockf(fd,fcntl.LOCK_UN)
288 os.close(fd)
289 except OSError, x:
290 if x.errno != errno.ENOENT:
291 raise RepositoryError(self.repo, "Error on session file access '%s': %s" % (self.fn,x))
292 else:
293 raise RepositoryError(self.repo, "Own session file not found! Possibly deleted by another ganga session.\n\
294 Possible reasons could be that this computer has a very high load, or that the system clocks on computers running Ganga are not synchronized.\n\
295 On computers with very high load and on network filesystems, try to avoid running concurrent ganga sessions for long.")
296 except IOError, x:
297 raise RepositoryError(self.repo, "Error on session file locking '%s': %s" % (self.fn,x))
298
299
301 """ Tries to read the counter file.
302 Raises ValueError (invalid contents)
303 Raises OSError (no access/does not exist)
304 Raises RepositoryError (fatal)
305 """
306 try:
307 fd = os.open(self.cntfn, os.O_RDONLY)
308 try:
309 if not self.afs:
310 fcntl.lockf(fd,fcntl.LOCK_SH)
311 return int(os.read(fd,100).split("\n")[0])
312 finally:
313 if not self.afs:
314 fcntl.lockf(fd,fcntl.LOCK_UN)
315 os.close(fd)
316 except OSError, x:
317 if x.errno != errno.ENOENT:
318 raise RepositoryError(self.repo, "OSError on count file '%s' read: %s" % (self.cntfn, x))
319 else:
320 raise
321 except IOError, x:
322 raise RepositoryError(self.repo, "Locking error on count file '%s' write: %s" % (self.cntfn, x))
323
325 """ Writes the counter to the counter file.
326 The global lock MUST be held for this function to work correctly
327 Raises OSError if count file is inaccessible """
328 try:
329
330 fd = os.open(self.cntfn,os.O_WRONLY)
331 if not self.afs:
332 fcntl.lockf(fd,fcntl.LOCK_EX)
333 os.write(fd,str(self.count)+"\n")
334 if not self.afs:
335 fcntl.lockf(fd,fcntl.LOCK_UN)
336 os.close(fd)
337 except OSError, x:
338 if x.errno != errno.ENOENT:
339 raise RepositoryError(self.repo, "OSError on count file '%s' write: %s" % (self.cntfn, x))
340 else:
341 raise RepositoryError(self.repo, "Count file '%s' not found! Repository was modified externally!" % (self.cntfn))
342 except IOError, x:
343 raise RepositoryError(self.repo, "Locking error on count file '%s' write: %s" % (self.cntfn, x))
344
345
347 """ Locks the next n available ids and returns them as a list
348 Raise RepositoryError on fatal error"""
349 self.global_lock_acquire()
350 try:
351
352 try:
353 newcount = self.cnt_read()
354 except ValueError:
355 logger.warning("Corrupt job counter (possibly due to crash of another session)! Trying to recover...")
356 newcount = self.count
357 except OSError:
358 raise RepositoryError(self.repo, "Job counter deleted! External modification to repository!")
359 if not newcount >= self.count:
360
361 logger.warning("Internal counter increased - probably the count file was deleted.")
362 newcount = self.count
363 if self.locked and max(self.locked) >= newcount:
364 newcount = max(self.locked) + 1
365 ids = range(newcount,newcount+n)
366 self.locked.update(ids)
367 self.count = newcount+n
368 self.cnt_write()
369 self.session_write()
370 return list(ids)
371 finally:
372 self.global_lock_release()
373
395
404
405
406
408 self.global_lock_acquire()
409 try:
410 f = file(self.cntfn)
411 newcount = int(f.readline())
412 f.close()
413 assert newcount >= self.count
414 sessions = os.listdir(self.sdir)
415 prevnames = Set()
416 for session in sessions:
417 if not session.endswith(self.name+".locks"):
418 continue
419 try:
420 sf = os.path.join(self.sdir,session)
421 fd = -1
422 if not self.afs:
423 fd = os.open(sf, os.O_RDONLY)
424 fcntl.lockf(fd,fcntl.LOCK_SH)
425 names = pickle.load(file(sf))
426 if not self.afs and fd > 0:
427 fcntl.lockf(fd,fcntl.LOCK_UN)
428 os.close(fd)
429 except Exception, x:
430 logger.warning("CHECKER: session file %s corrupted: %s %s" % (session, x.__class__.__name__, x) )
431 continue
432 if not len(names & prevnames) == 0:
433 print "Double-locked stuff:", names & prevnames
434 assert False
435 prevnames.union_update(names)
436
437 finally:
438 self.global_lock_release()
439
440
442 """get_lock_session(id)
443 Tries to determine the session that holds the lock on id for information purposes, and return an informative string.
444 Returns None on failure
445 """
446 self.global_lock_acquire()
447 try:
448 sessions = [s for s in os.listdir(self.sdir) if s.endswith(self.name+".locks")]
449 for session in sessions:
450 try:
451 sf = os.path.join(self.sdir,session)
452 fd = -1
453 if not self.afs:
454 fd = os.open(sf, os.O_RDONLY)
455 fcntl.lockf(fd,fcntl.LOCK_SH)
456 names = pickle.load(file(sf))
457 if not self.afs and fd > 0:
458 fcntl.lockf(fd,fcntl.LOCK_UN)
459 os.close(fd)
460 if id in names:
461 return self.session_to_info(session)
462 except Exception, x:
463 continue
464 finally:
465 self.global_lock_release()
466
468 """get_session_list()
469 Tries to determine the other sessions that are active and returns an informative string for each of them.
470 """
471 self.global_lock_acquire()
472 try:
473 sessions = [s for s in os.listdir(self.sdir) if s.endswith(".session") and not os.path.join(self.sdir,s) == self.gfn]
474 return [self.session_to_info(session) for session in sessions]
475 finally:
476 self.global_lock_release()
477
479 """reap_locks() --> True/False
480 Remotely clear all foreign locks from the session.
481 WARNING: This is not nice.
482 Returns True on success, False on error."""
483 failed = False
484 self.global_lock_acquire()
485 try:
486 sessions = [s for s in os.listdir(self.sdir) if s.endswith(".session") and not os.path.join(self.sdir,s) == self.gfn]
487 for session in sessions:
488 try:
489 sf = os.path.join(self.sdir,session)
490 os.unlink(sf)
491 except OSError,x:
492 failed = True
493 return not failed
494 finally:
495 self.global_lock_release()
496
498 si = session.split(".")
499 try:
500 return "%s (pid %s) since %s" % (".".join(si[:-3]),si[-2],time.ctime(int(si[-3])/1000))
501 except Exception:
502 return session
503
505 slm = SessionLockManager("locktest","tester")
506 while True:
507 print "lock ---", slm.lock_ids(random.sample(xrange(100),3))
508 print "unlock---", slm.release_ids(random.sample(xrange(100),3))
509 slm.check()
510
517
518 import random
519 if __name__ == "__main__":
520 import sys
521 if len(sys.argv) == 1:
522 print "Usage: python SessionLock.py {1|2}"
523 sys.exit(-1)
524 if sys.argv[1] == "1":
525 test1()
526 elif sys.argv[1] == "2":
527 test2()
528