Package Ganga :: Package Core :: Package GangaRepository :: Module SessionLock
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.GangaRepository.SessionLock

  1  # This class (SessionLockManager) encapsulates fcntl-based locking that works on NFS, AFS and locally. 
  2  # You can use  
  3  # python SessionLock.py {1|2} 
  4  # to run locking tests (run several instances in the same directory, from different machines) 
  5   
  6  import os, time, errno, threading, fcntl, random 
  7   
  8  import sys 
  9  if sys.hexversion >= 0x020600F0: 
 10      Set = set 
 11  else: 
 12      from sets import Set 
 13   
 14  try: 
 15      import cPickle as pickle 
 16  except ImportError: 
 17      import pickle 
 18   
 19  try: 
 20      from Ganga.Core.GangaThread import GangaThread 
 21      from Ganga.Core.GangaRepository import RepositoryError 
 22      import Ganga.Utility.logging 
 23      logger = Ganga.Utility.logging.getLogger() 
 24  except ImportError: 
 25      print "IMPORT ERROR SHOULD NOT OCCUR IN PRODUCTION CODE!!!!!!!!!!!!!!!!!!!!!!" 
 26      from threading import Thread 
27 - class GangaThread(Thread):
28 - def __init__(self,name):
29 self.name = name 30 super(GangaThread,self).__init__()
31 - def should_stop(self):
32 return False
33
34 - class Logger:
35 - def warning(self,msg):
36 print msg
37 - def debug(self,msg):
38 print msg
39
40 - class RepositoryError(Exception):
41 pass
42 logger = Logger() 43 44 session_expiration_timeout = 40 # seconds 45 session_lock_refresher = None 46
47 -class SessionLockRefresher(GangaThread):
48 - def __init__(self,session_name,sdir,fn, repo):
49 GangaThread.__init__(self, name='SessionLockRefresher', critical=False) 50 self.session_name = session_name 51 self.sdir = sdir 52 self.fn = fn 53 self.repos = [repo]
54 55
56 - def run(self):
57 58 try: 59 while not self.should_stop(): 60 ## TODO: Check for services active/inactive 61 try: 62 try: 63 oldnow = os.stat(self.fn).st_ctime 64 os.utime(self.fn,None) 65 now = os.stat(self.fn).st_ctime 66 #if now - oldnow > session_expiration_timeout/2: 67 # logger.warning("%s: This session can only update its session file every %s seconds - this can cause problems with other sessions!" % (time.time(), now - oldnow)) 68 #print "%s: Delta is %i seconds" % (time.time(), now - oldnow) 69 except OSError, x: 70 if x.errno != errno.ENOENT: 71 logger.debug("Session file timestamp could not be updated! Locks could be lost!") 72 else: 73 raise RepositoryError(self.repos[0], "Own session file not found! Possibly deleted by another ganga session.\n\ 74 Possible reasons could be that this computer has a very high load, or that the system clocks on computers running Ganga are not synchronized.\n\ 75 On computers with very high load and on network filesystems, try to avoid running concurrent ganga sessions for long.") 76 # Clear expired session files if monitoring is active 77 78 try: 79 from Ganga.Core import monitoring_component 80 if not monitoring_component is None and monitoring_component.enabled: 81 # Make list of sessions that are "alive" 82 ls_sdir = os.listdir(self.sdir) 83 session_files = [f for f in ls_sdir if f.endswith(".session")] 84 lock_files = [f for f in ls_sdir if f.endswith(".locks")] 85 for sf in session_files: 86 if os.path.join(self.sdir,sf) == self.fn: 87 continue 88 mtm = os.stat(os.path.join(self.sdir,sf)).st_ctime 89 #print "%s: session %s delta is %s seconds" % (time.time(), sf, now - mtm) 90 if now - mtm > session_expiration_timeout: 91 logger.warning("Removing session %s because of inactivity (no update since %s seconds)" % (sf, now - mtm)) 92 os.unlink(os.path.join(self.sdir,sf)) 93 session_files.remove(sf) 94 #elif now - mtm > session_expiration_timeout/2: 95 # logger.warning("%s: Session %s is inactive (no update since %s seconds, removal after %s seconds)" % (time.time(), sf, now - mtm, session_expiration_timeout)) 96 # remove all lock files that do not belong to sessions that are alive 97 for f in lock_files: 98 if not f.endswith(".session"): 99 asf = f.split(".session")[0] + ".session" 100 if not asf in session_files: 101 #logger.warning("Removing dead file %s" % (f)) 102 os.unlink(os.path.join(self.sdir,f)) 103 except OSError, x: 104 # nothing really important, another process deleted the session before we did. 105 logger.info("Unimportant OSError in loop: %s" % x) 106 except RepositoryError: 107 break 108 except Exception, x: 109 logger.warning("Internal exception in session lock thread: %s %s" % (x.__class__.__name__, x)) 110 time.sleep(1+random.random()) 111 finally: 112 # On shutdown remove session file 113 try: 114 os.unlink(self.fn) 115 except OSError, x: 116 logger.debug("Session file was deleted already or removal failed: %s" % (x)) 117 self.unregister() 118 global session_lock_refresher 119 session_lock_refresher = None
120
121 - def addRepo(self,repo):
122 self.repos.append(repo)
123
124 -class SessionLockManager(object):
125 """ Class with thread that keeps a global lock file that synchronizes 126 ID and counter access across Ganga sessions. 127 DEVELOPER WARNING: On NFS, files that are not locked with lockf (NOT flock) will 128 NOT be synchronized across clients, even if a global lock file is used! 129 Interface: 130 * startup() starts the session, automatically called on init 131 * shutdown() stops the thread, FREES ALL LOCKS 132 * make_new_ids(n) returns n new (locked) ids 133 * lock_ids(ids) returns the ids that were successfully locked 134 * release_ids(ids) returns the ids that were successfully released (now: all) 135 All access to an instance of this class MUST be synchronized! 136 Should ONLY raise RepositoryError (if possibly-corrupting errors are found) 137 """
138 - def mkdir(self,dn):
139 """Make sure the given directory exists""" 140 try: 141 os.makedirs(dn) 142 except OSError, x: 143 if x.errno != errno.EEXIST: 144 raise RepositoryError(self.repo, "OSError on directory create: %s" % x)
145
146 - def __init__(self, repo, root, name, minimum_count=0):
147 148 self.repo = repo 149 self.mkdir(root) 150 realpath = os.path.realpath(root) 151 # Use the hostname (os.uname()[1]) and the current time in ms to construct the session filename. 152 # TODO: Perhaps put the username here? 153 global session_lock_refresher 154 if session_lock_refresher is None: 155 session_name = ".".join([os.uname()[1],str(int(time.time()*1000)),str(os.getpid()),"session"]) 156 else: 157 session_name = session_lock_refresher.session_name 158 159 self.sdir = os.path.join(realpath,"sessions") 160 self.gfn = os.path.join(self.sdir, session_name) 161 self.fn = os.path.join(self.sdir, session_name+"."+name+".locks") 162 self.cntfn = os.path.join(realpath,name,"cnt") 163 164 self.afs = (realpath[:4] == "/afs") 165 self.locked = Set() 166 self.count = minimum_count 167 self.session_name = session_name 168 self.name = name 169 self.realpath = realpath
170 171 172
173 - def startup(self):
174 # Ensure directories exist 175 self.mkdir(os.path.join(self.realpath,"sessions")) 176 self.mkdir(os.path.join(self.realpath,self.name)) 177 178 # setup global lock 179 self.global_lock_setup() 180 self.global_lock_acquire() 181 try: 182 # setup counter file if it does not exist, read it if it does 183 if not os.path.exists(self.cntfn): 184 try: 185 fd = os.open(self.cntfn, os.O_EXCL | os.O_CREAT | os.O_WRONLY) 186 os.write(fd,"0") 187 os.close(fd) 188 except OSError, x: 189 if x.errno != errno.EEXIST: 190 raise RepositoryError(self.repo, "OSError on count file create: %s" % x) 191 try: 192 self.count = max(self.count,self.cnt_read()) 193 except ValueError: 194 logger.error("Corrupt count file '%s'! Trying to recover..." % (self.cntfn)) 195 except OSError, x: 196 raise RepositoryError(self.repo, "OSError on count file '%s' access!" % (self.cntfn)) 197 self.cnt_write() 198 # Setup session file 199 try: 200 fd = os.open(self.fn, os.O_EXCL | os.O_CREAT | os.O_WRONLY) 201 os.write(fd,pickle.dumps(Set())) 202 os.close(fd) 203 except OSError, x: 204 raise RepositoryError(self.repo, "Error on session file '%s' creation: %s" % (self.fn,x)) 205 global session_lock_refresher 206 if session_lock_refresher is None: 207 try: 208 os.close(os.open(self.gfn, os.O_EXCL | os.O_CREAT | os.O_WRONLY)) 209 except OSError, x: 210 raise RepositoryError(self.repo, "Error on session file '%s' creation: %s" % (self.gfn,x)) 211 session_lock_refresher = SessionLockRefresher(self.session_name, self.sdir, self.gfn, self.repo) 212 session_lock_refresher.start() 213 else: 214 session_lock_refresher.addRepo(self.repo) 215 self.session_write() 216 finally: 217 self.global_lock_release()
218 219
220 - def shutdown(self):
221 """Shutdown the thread and locking system (on ganga shutdown or repo error)""" 222 self.locked = Set() 223 try: 224 os.unlink(self.fn) 225 except OSError, x: 226 logger.debug("Session file '%s' was deleted already or removal failed: %s" % (self.fn,x))
227 228 # Global lock function
229 - def global_lock_setup(self):
230 self.lockfn = os.path.join(self.sdir,"global_lock") 231 try: 232 file(self.lockfn,"w").close() # create file (does not interfere with existing sessions) 233 self.lockfd = os.open(self.lockfn,os.O_RDWR) 234 except IOError, x: 235 raise RepositoryError(self.repo, "Could not create lock file '%s': %s" % (self.lockfn, x)) 236 except OSError, x: 237 raise RepositoryError(self.repo, "Could not open lock file '%s': %s" % (self.lockfn, x))
238
239 - def global_lock_acquire(self):
240 try: 241 fcntl.lockf(self.lockfd,fcntl.LOCK_EX) 242 except IOError, x: 243 raise RepositoryError(self.repo, "IOError on lock ('%s'): %s" % (self.lockfn, x))
244
245 - def global_lock_release(self):
246 try: 247 fcntl.lockf(self.lockfd,fcntl.LOCK_UN) 248 except IOError, x: 249 raise RepositoryError(self.repo, "IOError on unlock ('%s'): %s" % (self.lockfn, x))
250 251 # Session read-write functions
252 - def session_read(self,fn):
253 """ Reads a session file and returns a set of IDs locked by that session. 254 The global lock MUST be held for this function to work, although on NFS additional 255 locking is done 256 Raises RepositoryError if severe access problems occur (corruption otherwise!) """ 257 try: 258 fd = os.open(fn, os.O_RDONLY) # This can fail (thats OK, file deleted in the meantime) 259 try: 260 if not self.afs: # additional locking for NFS 261 fcntl.lockf(fd,fcntl.LOCK_SH) 262 try: 263 return pickle.loads(os.read(fd,104857600)) # read up to 100 MB (that is more than enough...) 264 except Exception, x: 265 logger.warning("corrupt or inaccessible session file '%s' - ignoring it (Exception %s %s)."% (fn, x.__class__.__name__, x)) 266 finally: 267 if not self.afs: # additional locking for NFS 268 fcntl.lockf(fd,fcntl.LOCK_UN) 269 os.close(fd) 270 except OSError, x: 271 if x.errno != errno.ENOENT: 272 raise RepositoryError(self.repo, "Error on session file access '%s': %s" % (fn,x)) 273 return Set()
274
275 - def session_write(self):
276 """ Writes the locked set to the session file. 277 The global lock MUST be held for this function to work, although on NFS additional 278 locking is done 279 Raises RepositoryError if session file is inaccessible """ 280 try: 281 # If this fails, we want to shutdown the repository (corruption possible) 282 fd = os.open(self.fn,os.O_WRONLY) 283 if not self.afs: 284 fcntl.lockf(fd,fcntl.LOCK_EX) 285 os.write(fd,pickle.dumps(self.locked)) 286 if not self.afs: 287 fcntl.lockf(fd,fcntl.LOCK_UN) 288 os.close(fd) 289 except OSError, x: 290 if x.errno != errno.ENOENT: 291 raise RepositoryError(self.repo, "Error on session file access '%s': %s" % (self.fn,x)) 292 else: 293 raise RepositoryError(self.repo, "Own session file not found! Possibly deleted by another ganga session.\n\ 294 Possible reasons could be that this computer has a very high load, or that the system clocks on computers running Ganga are not synchronized.\n\ 295 On computers with very high load and on network filesystems, try to avoid running concurrent ganga sessions for long.") 296 except IOError, x: 297 raise RepositoryError(self.repo, "Error on session file locking '%s': %s" % (self.fn,x))
298 299 # counter read-write functions
300 - def cnt_read(self):
301 """ Tries to read the counter file. 302 Raises ValueError (invalid contents) 303 Raises OSError (no access/does not exist) 304 Raises RepositoryError (fatal) 305 """ 306 try: 307 fd = os.open(self.cntfn, os.O_RDONLY) 308 try: 309 if not self.afs: # additional locking for NFS 310 fcntl.lockf(fd,fcntl.LOCK_SH) 311 return int(os.read(fd,100).split("\n")[0]) # 100 bytes should be enough for any ID. Can raise ValueErrorr 312 finally: 313 if not self.afs: # additional locking for NFS 314 fcntl.lockf(fd,fcntl.LOCK_UN) 315 os.close(fd) 316 except OSError, x: 317 if x.errno != errno.ENOENT: 318 raise RepositoryError(self.repo, "OSError on count file '%s' read: %s" % (self.cntfn, x)) 319 else: 320 raise # This can be a recoverable error, depending on where it occurs 321 except IOError, x: 322 raise RepositoryError(self.repo, "Locking error on count file '%s' write: %s" % (self.cntfn, x))
323
324 - def cnt_write(self):
325 """ Writes the counter to the counter file. 326 The global lock MUST be held for this function to work correctly 327 Raises OSError if count file is inaccessible """ 328 try: 329 # If this fails, we want to shutdown the repository (corruption possible) 330 fd = os.open(self.cntfn,os.O_WRONLY) 331 if not self.afs: 332 fcntl.lockf(fd,fcntl.LOCK_EX) 333 os.write(fd,str(self.count)+"\n") 334 if not self.afs: 335 fcntl.lockf(fd,fcntl.LOCK_UN) 336 os.close(fd) 337 except OSError, x: 338 if x.errno != errno.ENOENT: 339 raise RepositoryError(self.repo, "OSError on count file '%s' write: %s" % (self.cntfn, x)) 340 else: 341 raise RepositoryError(self.repo, "Count file '%s' not found! Repository was modified externally!" % (self.cntfn)) 342 except IOError, x: 343 raise RepositoryError(self.repo, "Locking error on count file '%s' write: %s" % (self.cntfn, x))
344 345 # "User" functions
346 - def make_new_ids(self,n):
347 """ Locks the next n available ids and returns them as a list 348 Raise RepositoryError on fatal error""" 349 self.global_lock_acquire() 350 try: 351 # Actualize count 352 try: 353 newcount = self.cnt_read() 354 except ValueError: 355 logger.warning("Corrupt job counter (possibly due to crash of another session)! Trying to recover...") 356 newcount = self.count 357 except OSError: 358 raise RepositoryError(self.repo, "Job counter deleted! External modification to repository!") 359 if not newcount >= self.count: 360 #raise RepositoryError(self.repo, "Counter value decreased - logic error!") 361 logger.warning("Internal counter increased - probably the count file was deleted.") 362 newcount = self.count 363 if self.locked and max(self.locked) >= newcount: # someone used force_ids (for example old repository imports) 364 newcount = max(self.locked) + 1 365 ids = range(newcount,newcount+n) 366 self.locked.update(ids) 367 self.count = newcount+n 368 self.cnt_write() 369 self.session_write() 370 return list(ids) 371 finally: 372 self.global_lock_release()
373
374 - def lock_ids(self,ids):
375 ids = Set(ids) 376 self.global_lock_acquire() 377 try: 378 try: 379 sessions = [sn for sn in os.listdir(self.sdir) if sn.endswith(self.name+".locks")] 380 except OSError, x: 381 raise RepositoryError(self.repo, "Could not list session directory '%s'!" % (self.sdir)) 382 383 slocked = Set() 384 for session in sessions: 385 sf = os.path.join(self.sdir,session) 386 if sf == self.fn: 387 continue 388 slocked.update(self.session_read(sf)) 389 ids.difference_update(slocked) 390 self.locked.update(ids) 391 self.session_write() 392 return list(ids) 393 finally: 394 self.global_lock_release()
395
396 - def release_ids(self,ids):
397 self.global_lock_acquire() 398 try: 399 self.locked.difference_update(ids) 400 self.session_write() 401 return list(ids) 402 finally: 403 self.global_lock_release()
404 405 406
407 - def check(self):
408 self.global_lock_acquire() 409 try: 410 f = file(self.cntfn) 411 newcount = int(f.readline()) 412 f.close() 413 assert newcount >= self.count 414 sessions = os.listdir(self.sdir) 415 prevnames = Set() 416 for session in sessions: 417 if not session.endswith(self.name+".locks"): 418 continue 419 try: 420 sf = os.path.join(self.sdir,session) 421 fd = -1 422 if not self.afs: 423 fd = os.open(sf, os.O_RDONLY) 424 fcntl.lockf(fd,fcntl.LOCK_SH) # ONLY NFS 425 names = pickle.load(file(sf)) 426 if not self.afs and fd > 0: 427 fcntl.lockf(fd,fcntl.LOCK_UN) # ONLY NFS 428 os.close(fd) 429 except Exception, x: 430 logger.warning("CHECKER: session file %s corrupted: %s %s" % (session, x.__class__.__name__, x) ) 431 continue 432 if not len(names & prevnames) == 0: 433 print "Double-locked stuff:", names & prevnames 434 assert False 435 prevnames.union_update(names) 436 437 finally: 438 self.global_lock_release()
439 440
441 - def get_lock_session(self,id):
442 """get_lock_session(id) 443 Tries to determine the session that holds the lock on id for information purposes, and return an informative string. 444 Returns None on failure 445 """ 446 self.global_lock_acquire() 447 try: 448 sessions = [s for s in os.listdir(self.sdir) if s.endswith(self.name+".locks")] 449 for session in sessions: 450 try: 451 sf = os.path.join(self.sdir,session) 452 fd = -1 453 if not self.afs: 454 fd = os.open(sf, os.O_RDONLY) 455 fcntl.lockf(fd,fcntl.LOCK_SH) # ONLY NFS 456 names = pickle.load(file(sf)) 457 if not self.afs and fd > 0: 458 fcntl.lockf(fd,fcntl.LOCK_UN) # ONLY NFS 459 os.close(fd) 460 if id in names: 461 return self.session_to_info(session) 462 except Exception, x: 463 continue 464 finally: 465 self.global_lock_release()
466
467 - def get_other_sessions(self):
468 """get_session_list() 469 Tries to determine the other sessions that are active and returns an informative string for each of them. 470 """ 471 self.global_lock_acquire() 472 try: 473 sessions = [s for s in os.listdir(self.sdir) if s.endswith(".session") and not os.path.join(self.sdir,s) == self.gfn] 474 return [self.session_to_info(session) for session in sessions] 475 finally: 476 self.global_lock_release() 477
478 - def reap_locks(self):
479 """reap_locks() --> True/False 480 Remotely clear all foreign locks from the session. 481 WARNING: This is not nice. 482 Returns True on success, False on error.""" 483 failed = False 484 self.global_lock_acquire() 485 try: 486 sessions = [s for s in os.listdir(self.sdir) if s.endswith(".session") and not os.path.join(self.sdir,s) == self.gfn] 487 for session in sessions: 488 try: 489 sf = os.path.join(self.sdir,session) 490 os.unlink(sf) 491 except OSError,x: 492 failed = True 493 return not failed 494 finally: 495 self.global_lock_release()
496
497 - def session_to_info(self,session):
498 si = session.split(".") 499 try: 500 return "%s (pid %s) since %s" % (".".join(si[:-3]),si[-2],time.ctime(int(si[-3])/1000)) 501 except Exception: 502 return session
503
504 -def test1():
505 slm = SessionLockManager("locktest","tester") 506 while True: 507 print "lock ---", slm.lock_ids(random.sample(xrange(100),3)) 508 print "unlock---", slm.release_ids(random.sample(xrange(100),3)) 509 slm.check()
510
511 -def test2():
512 slm = SessionLockManager("locktest","tester") 513 while True: 514 n = random.randint(1,9) 515 print "get %i ids ---"%n, slm.make_new_ids(n) 516 slm.check()
517 518 import random 519 if __name__ == "__main__": 520 import sys 521 if len(sys.argv) == 1: 522 print "Usage: python SessionLock.py {1|2}" 523 sys.exit(-1) 524 if sys.argv[1] == "1": 525 test1() 526 elif sys.argv[1] == "2": 527 test2() 528