Package Ganga :: Package Utility :: Package external :: Package ARDAMDClient :: Module mdstandalone
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Utility.external.ARDAMDClient.mdstandalone

  1  ################################################################################ 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: mdstandalone.py,v 1.1 2008-07-17 16:41:02 moscicki Exp $ 
  5  ################################################################################ 
  6  import os 
  7  import re 
  8  import errno 
  9  import time 
 10  import mdinterface 
 11  from mdinterface import CommandException, MDInterface 
 12  from mdparser import MDParser 
 13  from mdtable import MDTable 
 14  from diskutils import RLock, getLast, readLast, write, remove 
 15   
 16  DEBUG = False 
 17   
18 -def visitLocksRemove(arg, directory, files):
19 age = arg 20 for name in files: 21 if name != 'LOCK': 22 continue 23 filename = os.path.join(directory, name) 24 mtime = os.stat(filename).st_mtime 25 if age > 0 and mtime > age: 26 continue 27 os.remove(filename)
28 29
30 -def visitLocksList(arg, directory, files):
31 (age, root, lines) = arg 32 for name in files: 33 if name != 'LOCK': 34 continue 35 filename = os.path.join(directory, name) 36 mtime = os.stat(filename).st_mtime 37 if age > 0 and mtime > age: 38 continue 39 line = filename[len(root)-1:] # Remove path to root directory 40 line = os.path.normpath(line) 41 line = line[0: len(line)-5] # Remove /LOCK 42 if len(line) == 0: 43 line = "/" 44 line = os.path.normpath(line) 45 lines.append(line)
46 47
48 -class MDStandalone (mdinterface.MDInterface):
49 - def __mkdir(self, newdir):
50 """ works the way a good mkdir should :) 51 - already exists, silently complete 52 - regular file in the way, raise an exception 53 - parent directory(ies) does not exist, make them as well 54 """ 55 if os.path.isdir(newdir): 56 pass 57 elif os.path.isfile(newdir): 58 raise mdinterface.CommandException(16, "Directory exists") 59 try: 60 os.makedirs(newdir) 61 except OSError, e: 62 if e[0] == errno.EPERM or e[0] == errno.EACCES: 63 raise mdinterface.CommandException(4, "Could not create dir: Permission denied") 64 else: 65 raise mdinterface.CommandException(16, "Directory existed")
66 67
68 - def __init__(self, root, 69 blocklength = 1000, 70 cache_size = 100000, 71 tries_limit = 200):
72 self.root = os.path.normpath(root) 73 self.blocklength = blocklength 74 self.cache_size = cache_size 75 self.tries_limit = tries_limit 76 self.rows=[] 77 self.tables = {} 78 self.currentDir = '/' 79 self.loaded_tables = {} 80 self.upload_cmd = {} 81 self.transaction_in_prgs = False 82 self.sequence_reserve = {}
83 84
85 - def __isDir(self, dirname):
86 name = self.__systemPath(dirname) 87 return os.path.isdir(name)
88 89
90 - def __absolutePath(self, table):
91 if DEBUG: print '__absolutePath for ', table 92 if len(table) and table[0] == '/': 93 prefix = '' 94 else: 95 prefix = self.currentDir + '/' 96 table = os.path.normpath(prefix + table).replace(os.sep, '/') 97 if DEBUG: print '__absolutePath: returning', table 98 return table
99 100
101 - def __systemPath(self, path):
102 path = os.path.normpath(self.root + '/' + path) 103 if DEBUG: print '__systemPath returns: ', path 104 return path
105 106
107 - def __initTransaction(self):
108 if not self.transaction_in_prgs: 109 self.tables = {}
110 111
112 - def __loadTable(self, table, update = True):
113 table = self.__absolutePath(table) 114 if DEBUG: print 'Loading table', table 115 if table in self.tables: 116 mdtable = self.tables[table] 117 else: 118 mdtable = self.__getTable(table) 119 if not mdtable.lock(): 120 raise CommandException(9, 'Could not acquire table lock %s' % table) 121 self.tables[table] = mdtable 122 mdtable.load() 123 return mdtable, table
124 125
126 - def __saveTable(self, table):
127 if not self.transaction_in_prgs: 128 if DEBUG: print 'Saving table', table 129 mdtable = self.loaded_tables[table] 130 mdtable.save()
131 132
133 - def __addEntry(self, mdtable, entry, attrs, values):
134 if mdtable.entries.has_key(entry): 135 raise CommandException(15, "Entry exists") 136 # new entry 137 e=[''] * (len(mdtable.attributes)+1) 138 e[0]=entry 139 for i in range(0, len(attrs)): 140 e[mdtable.attributeDict[attrs[i]]+1] = values[i] 141 mdtable.entries.append(e)
142 143
144 - def __getTable(self, table):
145 dirname = self.__systemPath(table) 146 if not os.path.isdir(dirname): 147 raise CommandException(1, 'File or directory does not exist') 148 if table not in self.loaded_tables: 149 self.loaded_tables[table] = MDTable(dirname, 150 blocklength = self.blocklength, 151 cache_size = self.cache_size, 152 tries_limit = self.tries_limit) 153 return self.loaded_tables[table]
154 155
156 - def releaseAllLocks(self):
157 if not self.transaction_in_prgs: 158 for table in self.tables: 159 self.tables[table].unlock()
160 161
162 - def eot(self):
163 return len(self.rows) == 0
164 165
166 - def addEntries(self, entries):
167 self.__initTransaction() 168 try: 169 tablename = os.path.dirname(entries[0]) 170 mdtable, tablename = self.__loadTable(tablename, True) 171 emptyAttr = [''] * len(mdtable.attributes) 172 for entry in entries: 173 entry_key = os.path.basename(entry) 174 if mdtable.entries.has_key(entry_key): 175 raise CommandException(15, "Entry exists") 176 row = [entry_key] 177 row.extend(emptyAttr) 178 mdtable.entries.append(row) 179 self.__saveTable(tablename) 180 finally: 181 self.releaseAllLocks()
182 183
184 - def addAttr(self, file, name, t):
185 self.__initTransaction() 186 try: 187 dirname = self.__absolutePath(file) 188 if not self.__isDir(dirname): 189 dirname = os.path.dirname(dirname) 190 mdtable, tablename = self.__loadTable(dirname, True) 191 if mdtable.attributeDict.has_key(name): 192 raise CommandException(15, "Attribute exists") 193 mdtable.attributes.append([name, t]) 194 mdtable.update() 195 e_len = len(mdtable.attributes) 196 for i in range(0, len(mdtable.entries)): 197 e = mdtable.entries[i] 198 e[e_len:] = [''] 199 mdtable.entries[i] = e 200 self.__saveTable(tablename) 201 finally: 202 self.releaseAllLocks()
203 204
205 - def listAttr(self, file):
206 self.__initTransaction() 207 try: 208 dirname = self.__absolutePath(file) 209 if not self.__isDir(dirname): 210 dirname = os.path.dirname(dirname) 211 mdtable, dirname = self.__loadTable(dirname) 212 names = [] 213 types = [] 214 for i in range(0, len(mdtable.attributes)): 215 pair = mdtable.attributes[i] 216 names.append(pair[0]) 217 types.append(pair[1]) 218 return names, types 219 finally: 220 self.releaseAllLocks()
221 222
223 - def addEntry(self, file, keys, values):
224 self.__initTransaction() 225 try: 226 tablename, entry = os.path.split(file) 227 mdtable, tablename = self.__loadTable(tablename, True) 228 self.__addEntry(mdtable, entry, keys, values) 229 self.__saveTable(tablename) 230 finally: 231 self.releaseAllLocks()
232 233
234 - def getattr(self, file, attrs):
235 self.__initTransaction() 236 try: 237 if self.__isDir(file): 238 tablename = file 239 entry = '*' 240 else: 241 tablename, entry = os.path.split(file) 242 mdtable, tablename = self.__loadTable(tablename) 243 self.rows = [] 244 pattern = entry.replace('*', '.*') 245 pattern = pattern.replace('?', '.') 246 for i in range(0, len(mdtable.entries)): 247 e = mdtable.entries[i] 248 r = re.match(pattern, e[0]) 249 if r and r.group(0) == e[0]: 250 row = [] 251 row.append(e[0]) 252 for j in range(0, len(attrs)): 253 index = mdtable.attributeDict[attrs[j]] 254 row.append(e[index+1]) 255 self.rows.append(row) 256 finally: 257 self.releaseAllLocks()
258 259
260 - def getEntry(self):
261 e = self.rows.pop(0) 262 # entry, row = (e[0], e[1:]) 263 return (e[0], e[1:])
264 265
266 - def createDir(self, dirname):
267 if DEBUG: print 'createDir ', dirname 268 newdir = self.__systemPath(self.__absolutePath(dirname)) 269 self.__mkdir(newdir)
270 271
272 - def listEntries(self, dirname):
273 if DEBUG: print 'listEntries ', dirname 274 self.__initTransaction() 275 try: 276 dirname = self.__absolutePath(dirname) 277 if self.__isDir(dirname): 278 entry = "*" 279 else: 280 dirname, entry = os.path.split(dirname) 281 282 # List directories, first 283 dirs = os.listdir(self.__systemPath(dirname)) 284 for d in dirs: 285 if self.__isDir(dirname + '/' +d): 286 row = [] 287 row.append(dirname + '/' +d) 288 row.append('collection') 289 self.rows.append(row) 290 291 # Now list entries 292 mdtable, dirname = self.__loadTable(dirname) 293 pattern = entry.replace('*', '.*') 294 pattern = pattern.replace('?', '.') 295 for i in range(0, len(mdtable.entries)): 296 e = mdtable.entries[i] 297 r = re.match(pattern, e[0]) 298 if r and r.group(0) == e[0]: 299 row = [] 300 row.append(e[0]) 301 row.append('entry') 302 self.rows.append(row) 303 finally: 304 self.releaseAllLocks()
305 306
307 - def removeAttr(self, file, name):
308 if DEBUG: print 'removeAttr ', file, name 309 self.__initTransaction() 310 try: 311 dirname = self.__absolutePath(file) 312 if not self.__isDir(dirname): 313 dirname = os.path.dirname(dirname) 314 mdtable, dirname = self.__loadTable(dirname, True) 315 if not mdtable.attributeDict.has_key(name): 316 raise CommandException(10, "No such key") 317 iattr = mdtable.attributeDict[name] 318 del mdtable.attributes[iattr] 319 mdtable.update() 320 for i in range(0, len(mdtable.entries)): 321 e = mdtable.entries[i] 322 del e[iattr+1] 323 mdtable.entries[i] = e 324 self.__saveTable(dirname) 325 finally: 326 self.releaseAllLocks()
327 328
329 - def rm(self, path):
330 if DEBUG: print 'rm ', path 331 self.__initTransaction() 332 try: 333 dirname, entry = os.path.split(path) 334 mdtable, dirname = self.__loadTable(dirname, True) 335 pattern = entry.replace('*', '.*') 336 pattern = pattern.replace('?', '.') 337 for i in range(len(mdtable.entries)-1, -1, -1): 338 e = mdtable.entries[i] 339 r = re.match(pattern, e[0]) 340 if r and r.group(0) == e[0]: 341 del mdtable.entries[i] 342 self.__saveTable(dirname) 343 finally: 344 self.releaseAllLocks()
345 346
347 - def sequenceCreate(self, name, directory, increment=1, start=1):
348 name = self.__systemPath(self.__absolutePath(directory) + '/SEQ'+name) 349 lockfile = name+".LOCK" 350 lock = RLock(lockfile) 351 if lock.acquire(): 352 try: 353 try: 354 lfn = getLast(name) 355 except OSError: 356 pass 357 else: 358 raise CommandException(15, "Sequence exists: " + lfn) 359 entry = [str(start), str(increment), str(start)] 360 try: 361 write([entry], name) 362 except (OSError,IOError), e: 363 raise CommandException(17, "sequenceCreate error: " + str(e)) 364 finally: 365 lock.release() 366 else: 367 raise CommandException(9, "Cannot lock %s" % name)
368 369
370 - def sequenceNext(self, name, reserve = 1):
371 # with reserve > 1 this method returns 372 # reserved value for the counter if there is one available, 373 # otherwise it will reserve specified number of values and return the first one 374 def next(name, reserve): 375 dirname, seq = os.path.split(name) 376 name = self.__systemPath(self.__absolutePath(dirname) + '/SEQ'+seq) 377 lockfile = name+".LOCK" 378 lock = RLock(lockfile) 379 if lock.acquire(): 380 try: 381 try: 382 entries = readLast(name) 383 except (OSError,IOError), e: 384 raise CommandException(17, "Not a sequence: " + str(e)) 385 if not (len(entries)==1 and len(entries[0])==3): 386 raise CommandException(17, "Not a sequence: " + str(e)) 387 start, increment, now = entries[0] 388 newval = str(int(now) + int(increment) * reserve) 389 entry = [start, increment, newval] 390 try: 391 write([entry], name) 392 except (OSError,IOError), e: 393 raise CommandException(17, "sequenceNext error: " + str(e)) 394 return (now, increment, newval) 395 finally: 396 lock.release() 397 else: 398 raise CommandException(9, "Cannot lock %s" % name)
399 # use reserved values if possible (don't read the sequence file) 400 if reserve > 1: 401 try: 402 if name in self.sequence_reserve: 403 now, increment, reserved = self.sequence_reserve[name] 404 else: 405 now, increment, reserved = map(int, next(name, reserve)) 406 self.sequence_reserve[name] = [now, increment, reserved] 407 newval = now + increment 408 if newval >= reserved: 409 del self.sequence_reserve[name] 410 else: 411 self.sequence_reserve[name][0] = newval 412 return str(now) 413 except Exception, e: 414 raise CommandException(17, "sequenceNext error: " + str(e)) 415 else: 416 return next(name, reserve)[0] 417 418
419 - def sequenceRemove(self, name):
420 if name in self.sequence_reserve: 421 del self.sequence_reserve[name] 422 dirname, seq = os.path.split(name) 423 name = self.__systemPath(self.__absolutePath(dirname) + '/SEQ'+seq) 424 lockfile = name+".LOCK" 425 lock = RLock(lockfile) 426 if lock.acquire(): 427 try: 428 try: 429 remove(name) 430 except OSError, e: 431 raise CommandException(17, "sequenceRemove error: " + str(e)) 432 finally: 433 lock.release() 434 else: 435 raise CommandException(9, "Cannot lock %s" % name)
436 437
438 - def removeDir(self, dirname):
439 if DEBUG: print 'removeDir ', dirname 440 name = self.__systemPath(self.__absolutePath(dirname)) 441 try: 442 os.rmdir(name) 443 except OSError, e: 444 if e[0] == errno.ENOENT: 445 raise mdinterface.CommandException(1, "Directory not found") 446 else: 447 raise mdinterface.CommandException(11, "Directory not empty")
448 449
450 - def selectAttr(self, attrs, query):
451 if DEBUG: print 'selectAttr ', attrs, query 452 self.__initTransaction() 453 try: 454 self.rows = [] 455 tList = [] 456 aList = [] 457 for a in attrs: 458 table, attr = a.split(':', 1) 459 mdtable, table = self.__loadTable(table) 460 tList.append(table) 461 aList.append(attr) 462 463 parser = MDParser(query, self.tables, 464 self.currentDir, self.__loadTable) 465 466 # First, only load necessary tables 467 parser.parseWhereClause() 468 allTables = [] 469 iterations = 0 470 for k, v in self.tables.iteritems(): 471 v.initRowPointer() 472 allTables.append(v) 473 if iterations == 0: 474 iterations = len(v.entries) 475 else: 476 iterations = iterations * len(v.entries) 477 478 for i in range(0, iterations): 479 if parser.parseWhereClause(): 480 row = [] 481 for j in range(0, len(attrs)): 482 t = self.tables[tList[j]] 483 if aList[j] == "FILE": 484 index = 0 485 else: 486 index = t.attributeDict[aList[j]]+1 487 row.append(t.entries[t.currentRow][index]) 488 self.rows.append(row) 489 for j in range (0, len(allTables)): 490 if allTables[j].currentRow < 0: 491 continue 492 allTables[j].currentRow = allTables[j].currentRow + 1 493 if allTables[j].currentRow < len(allTables[j].entries): 494 break 495 allTables[j].currentRow = 0 496 finally: 497 self.releaseAllLocks()
498 499
500 - def getSelectAttrEntry(self):
501 # attributes = self.rows.pop(0) 502 return self.rows.pop(0)
503 504
505 - def updateAttr(self, pattern, updateExpr, condition):
506 if DEBUG: print 'updateAttr ', pattern, updateExpr, condition 507 self.__initTransaction() 508 try: 509 dirname = self.__absolutePath(pattern) 510 if self.__isDir(dirname): 511 entry = "*" 512 else: 513 dirname, entry = os.path.split(dirname) 514 mdtable, tablename = self.__loadTable(dirname, True) 515 pattern = entry.replace('*', '.*') 516 pattern = pattern.replace('?', '.') 517 518 parser = MDParser(condition, self.tables, 519 tablename, self.__loadTable) 520 521 # First, only load necessary tables 522 parser.parseWhereClause() 523 524 # Prepare update expressions 525 expressions = [] 526 for e in updateExpr: 527 var, exp = self.splitUpdateClause(e) 528 index = mdtable.attributeDict[var]+1 529 p = MDParser(exp, self.tables, tablename, 530 self.__loadTable) 531 parser.parseWhereClause() 532 expressions.append( (index, exp, p) ) 533 534 # Find all tables which have entries and list them in allTables, 535 # and count the number of iterations we need to do 536 allTables = [] 537 iterations = 0 538 for k, v in self.tables.iteritems(): 539 v.initRowPointer() 540 if len(v.entries) == 0: 541 continue 542 allTables.append(v) 543 if iterations == 0: 544 iterations = len(v.entries) 545 else: 546 iterations = iterations * len(v.entries) 547 548 # Iterate over all combinations of table entries, check whether 549 # update clause if fulfilled and do the update 550 for i in range(0, iterations): 551 e = mdtable.entries[mdtable.currentRow] 552 r = re.match(pattern, e[0]) 553 if r and r.group(0) == e[0]: 554 if parser.parseWhereClause(): 555 for j in range (0, len(expressions)): 556 index, exp, p = expressions[j] 557 value = p.parseStatement() 558 e[index] = value 559 mdtable.entries[mdtable.currentRow] = e 560 for j in range (0, len(allTables)): 561 allTables[j].currentRow = allTables[j].currentRow + 1 562 if allTables[j].currentRow < len(allTables[j].entries): 563 break 564 else: 565 allTables[j].currentRow = 0 566 self.__saveTable(tablename) 567 finally: 568 self.releaseAllLocks()
569 570
571 - def setAttr(self, file, keys, values):
572 self.__initTransaction() 573 tablename, entry = os.path.split(file) 574 mdtable, tablename = self.__loadTable(tablename, True) 575 try: 576 pattern = entry.replace('*', '.*') 577 pattern = pattern.replace('?', '.') 578 for i in range(0, len(mdtable.entries)): 579 e = mdtable.entries[i] 580 r = re.match(pattern, e[0]) 581 if r and r.group(0) == e[0]: 582 for j in range(0, len(keys)): 583 index = mdtable.attributeDict[keys[j]] 584 e[index+1] = values[j] 585 mdtable.entries[i] = e 586 self.__saveTable(tablename) 587 finally: 588 self.releaseAllLocks()
589 590
591 - def pwd(self):
592 return self.currentDir
593 594
595 - def cd(self, dir):
596 dir = self.__absolutePath(dir) 597 if not self.__isDir(dir): 598 raise CommandException(1, "Not a directory") 599 self.currentDir = dir
600 601
602 - def upload(self, collection, attributes):
603 self.transaction() # start transaction 604 mdtable, tablename = self.__loadTable(collection, True) 605 self.upload_cmd['collection'] = tablename 606 self.upload_cmd['attributes'] = attributes
607 608
609 - def put(self, file, values):
610 if not self.transaction_in_prgs: 611 raise CommandException(9, "Could not abort No transaction in progress. Command was: abort") 612 elif len(values)!=len(self.upload_cmd['attributes']): 613 raise CommandException(3, "Illegal command") 614 tablename, entry = os.path.split(file) 615 if tablename: 616 assert(tablename == self.upload_cmd['collection']) 617 mdtable = self.tables[self.upload_cmd['collection']] 618 attrs = self.upload_cmd['attributes'] 619 self.__addEntry(mdtable, entry, attrs, values)
620 621
622 - def abort(self):
623 if not self.transaction_in_prgs: 624 raise CommandException(9, "Could not abort No transaction in progress. Command was: abort") 625 try: 626 self.transaction_in_prgs = False 627 self.upload_cmd = {} 628 finally: 629 self.releaseAllLocks()
630 631
632 - def commit(self):
633 if not self.transaction_in_prgs: 634 raise CommandException(9, "Could not commit No transaction in progress. Command was: commit") 635 try: 636 self.transaction_in_prgs = False 637 for table in self.tables: 638 self.__saveTable(table) 639 finally: 640 self.releaseAllLocks()
641 642
643 - def transaction(self):
644 self.__initTransaction() 645 if not self.transaction_in_prgs: 646 self.transaction_in_prgs = True
647 648 649 # Removes all locks, even those of other processes recursively 650 # starting at the root directory by deleting the LOCK files 651 # If age is given it restricts the deletion operation on most 652 # operating systems to locks older than age seconds
653 - def removeAllLocks(self, age = -1):
654 if age > 0: 655 age = time.time()-age 656 os.path.walk(self.root, visitLocksRemove, age);
657 658 659 # Returns a list witha all currently held locks (locks older than age if age>0)
660 - def listAllLocks(self, age = -1):
661 if age > 0: 662 age = time.time()-age 663 664 lines = [] 665 os.path.walk(self.root, visitLocksList, (age, self.root, lines)); 666 return lines
667