Package Ganga :: Package Core :: Package JobRepository :: Module ARDA
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.JobRepository.ARDA

   1  ################################################################################ 
   2  # Ganga Project. http://cern.ch/ganga 
   3  # 
   4  # $Id: ARDA.py,v 1.2 2008-09-03 08:19:51 asaroka Exp $ 
   5  ################################################################################ 
   6   
   7  __version__ = "2.2" 
   8   
   9  import os 
  10  import shutil 
  11  import binascii 
  12  import types 
  13  import threading 
  14  import time 
  15  import Ganga.Utility.external.ARDAMDClient.mdclient 
  16  import Ganga.Utility.external.ARDAMDClient.mdstandalone 
  17  import Ganga.Utility.external.ARDAMDClient.mdparser 
  18  import Ganga.Utility.Config 
  19  from Base import JobRepository 
  20  from Separator import Parser 
  21  from Ganga.Core.exceptions import RepositoryError, BulkOperationRepositoryError 
  22  from Ganga.Utility.external.ARDAMDClient.mdclient import MDClient 
  23  from Ganga.Utility.external.ARDAMDClient.mdstandalone import MDStandalone 
  24  from Ganga.Utility.external.ARDAMDClient.mdinterface  import CommandException 
  25  from Ganga.Utility.external.ARDAMDClient.guid import newGuid 
  26  from Ganga.Utility.files import expandfilename 
  27   
  28   
  29  ######################### 
  30  # logging setup - BEGIN # 
  31  ######################### 
  32   
  33  import Ganga.Utility.logging 
  34  logger = Ganga.Utility.logging.getLogger(modulename=1) 
  35   
  36  # config for the logging module 
  37  logging_config = Ganga.Utility.Config.getConfig('Logging') 
  38   
  39  arda_logger_name = 'Ganga.Utility.external.ARDAMDClient' 
  40  arda_logger = Ganga.Utility.logging.getLogger(name=arda_logger_name) 
  41   
  42  # the debug flags are used in the external code (ARDA)  
  43  # therefore how to switch them correctly when upon modification of 
  44  # the Ganga.Utility.external.ARDAMDClient logger  
  45   
46 -def switch_debug(opt, value):
47 if opt.find(arda_logger_name) == 0: 48 dbg = value == 'DEBUG' 49 Ganga.Utility.external.ARDAMDClient.mdclient.DEBUG = dbg 50 Ganga.Utility.external.ARDAMDClient.mdstandalone.DEBUG = dbg 51 Ganga.Utility.external.ARDAMDClient.mdparser.DEBUG = dbg 52 Ganga.Utility.external.ARDAMDClient.mdparser.DEBUGS = dbg
53 54 # attach the action at user and session level 55 logging_config.attachUserHandler(None,switch_debug) 56 logging_config.attachSessionHandler(None,switch_debug) 57 58 # because session bootstrap of logger may have already been perfrormed 59 # force it here, according to the effective value of the logger 60 switch_debug(arda_logger_name,Ganga.Utility.logging.getLevelName(arda_logger.getEffectiveLevel())) 61 62 ######################## 63 # logging setup - DONE # 64 ######################## 65 66 ################################## 67 # repository configuration # 68 ################################## 69 all_configs = {} 70 all_configs['LocalAMGA'] = Ganga.Utility.Config.makeConfig('LocalAMGA_Repository','Settings for the local AMGA job repository') 71 all_configs['LocalAMGA'].addOption('blocklength', 1000, 'maximum number of jobs stored in a block of local repository') 72 all_configs['LocalAMGA'].addOption('cache_size', 3, 'maximum size of memory (in blocks) that local repository can use for job caching') 73 all_configs['LocalAMGA'].addOption('tries_limit', 200, 'maximum number of attempts to write/move file or to acquire the table lock in local repository') 74 all_configs['LocalAMGA'].addOption('lock_timeout', 60, 'maximum time in seconds that limits lock validity for local repository') 75 76 all_configs['RemoteAMGA'] = Ganga.Utility.Config.makeConfig('RemoteAMGA_Repository','Settings for the local AMGA job repository') 77 all_configs['RemoteAMGA'].addOption('host', 'gangamd.cern.ch', 'location of the AMGA metadata server used by the remote repository') 78 all_configs['RemoteAMGA'].addOption('port', 8822, 'port for secure connection to the remote repository') 79 all_configs['RemoteAMGA'].addOption('reqSSL', 1, 'flag for secure connection to the remote repository') 80 all_configs['RemoteAMGA'].addOption('login', '', 'login name to connect to the remote repository') 81 82 all_configs['Test'] = all_configs['LocalAMGA'] 83 ################################## 84 # repository configuration - END # 85 ################################## 86 87 88 # options 89 USE_ORACLE_AS_REMOTE_BACKEND = True 90 USE_FOLDERS_FOR_SUBJOBS = False 91 USE_COUNTERS_FOR_SUBJOBS = True 92 USE_COMPRESSED_BLOBS = True 93 #USE_ONE_WRITING_CLIENT = False 94 95 if not USE_FOLDERS_FOR_SUBJOBS: 96 USE_COUNTERS_FOR_SUBJOBS = True 97 98 99 # default schema 100 schema = [('id', 'int'), 101 ('name', 'varchar(254)'), 102 ('status', 'varchar(254)'), 103 ('application', 'varchar(254)'), 104 ('backend', 'varchar(254)')] 105 106 ###########################################################################################
107 -class ARDARepositoryMixIn(JobRepository):
108 _counterName = 'jobseq' 109 110 _jobsTreeFldr = 'jobstree' 111 _jobsTreeAttr = ('folders', 'text') 112 113 _lock = ('lock', 'varchar(254)') # internal lock 114 _blob = ('blob', 'text') # job blob 115 _id = ('id', 'int') # job id 116 _counter = ('counter', 'int') # split counter (shows next id); if > 0 the job has been split 117 _subjobs = ('subjobs', 'text') # list of subjob ids 118 _isasubjob = ('isasubjob', 'varchar(254)') # flag showing that whether job is a subjob (for selections) 119 _compressed = ('compressed', 'varchar(254)') # flag showing that blob has been compressed 120 _istate = ('istate', 'varchar(254)') # internal state (for advanced locking) 121 _time = ('time', 'varchar(254)') # time when the lock has been created 122 123 #--------------------------------------------------------------------------------------
124 - def __init__(self, schema, role, streamer, tree_streamer, root_dir, init_schema):
125 ## schema is the repository schema 126 ## role is not used 127 ## streamer is used to convert job objects to streams and vice versa 128 ## tree_streamer is a streamer for job tree 129 ## root_dir is path within the repository to the top level job folder 130 ## init_schema is a switch used to control addition of default attributes to the schema 131 JobRepository.__init__(self, schema, role, streamer, tree_streamer) 132 if not os.path.isabs(root_dir): 133 root_dir = os.path.join(os.sep, root_dir) 134 self.root_dir = os.path.normpath(root_dir) 135 136 # thread locking lock 137 self._rep_lock = threading.RLock() 138 139 # init all (job_cache, schema, root dir, job tree) 140 self._initAll(init_schema)
141 142 #--------------------------------------------------------------------------------------
143 - def _initAll(self, init_schema = True):
144 # local cache of checkouted jobs 145 self._job_cache = {} 146 147 # init schema 148 if init_schema: 149 self._initSchema() 150 151 # init root dir 152 self._initDir(self.root_dir) 153 154 ## folders support 155 # check that root_dir has _jobsTreeFldr folder 156 jtreefldr = os.path.join(self.root_dir, self._jobsTreeFldr) 157 self._createDirIfMissing(jtreefldr) 158 159 # check that the _jobsTreeFldr folder has required attributes 160 self._createAttrIfMissing(jtreefldr, [self._jobsTreeAttr])
161 162 #--------------------------------------------------------------------------------------
163 - def _initSchema(self):
164 self._rep_lock.acquire(1) 165 try: 166 # assert that job id is in schema 167 if self._id not in self.schema: 168 self.schema.append(self._id) 169 170 # extend schema to support subjobs 171 if self._counter not in self.schema: 172 self.schema.append(self._counter) 173 if self._subjobs not in self.schema: 174 self.schema.append(self._subjobs) 175 if self._isasubjob not in self.schema: 176 self.schema.append(self._isasubjob) 177 178 # extend schema to support locks 179 if self._lock not in self.schema: 180 self.schema.append(self._lock) 181 if self._istate not in self.schema: 182 self.schema.append(self._istate) 183 if self._time not in self.schema: 184 self.schema.append(self._time) 185 186 # extend schema to support blob ids 187 if self._compressed not in self.schema: 188 self.schema.append(self._compressed) 189 if self._blob not in self.schema: 190 self.schema.append(self._blob) 191 192 # subset of schema to be used for standart job update 193 self._commit_schema = self.schema[:] 194 self._commit_schema.remove(self._counter) 195 finally: 196 self._rep_lock.release()
197 198 #--------------------------------------------------------------------------------------
199 - def _initDir(self, path, schema = None, create_sequence = True):
200 self._rep_lock.acquire(1) 201 try: 202 # check if the root dir exists, if not than create it 203 self._createDirIfMissing(path) 204 205 # check that the dir supports schema 206 if not schema: 207 schema = self.schema 208 self._createAttrIfMissing(path, schema) 209 210 if create_sequence: 211 # check that the dir has job counter 212 try: 213 self.sequenceCreate(self._counterName, path) 214 except CommandException, e: 215 logger.debug(str(e)) 216 # do nothing; assume that sequence exists 217 finally: 218 self._rep_lock.release()
219 220 #--------------------------------------------------------------------------------------
221 - def _removeAllEntries(self, path):
222 # removes all entries (files) in the directory "path" 223 # there should not be any subdirectories in the path 224 # otherwise command fails 225 self._rep_lock.acquire(1) 226 try: 227 try: 228 self.rm(os.path.join(path, '*')) 229 except CommandException, e: 230 if e.errorCode == 1: #no enries 231 pass 232 finally: 233 self._rep_lock.release()
234 235 #--------------------------------------------------------------------------------------
236 - def _removeAllAttributes(self, path, schema):
237 # removes all attributes from the directory "path" 238 self._rep_lock.acquire(1) 239 try: 240 if not schema: 241 schema = self.schema 242 self._initCommand() 243 try: 244 map(lambda x: self.removeAttr(path, x[0]), schema) 245 finally: 246 self._finalizeCommand() 247 finally: 248 self._rep_lock.release()
249 250 #--------------------------------------------------------------------------------------
251 - def _isDirNotFoundError(self, e):
252 # type(e) == CommandException 253 return e.errorCode == 1
254 255 #--------------------------------------------------------------------------------------
256 - def _isDirNotEmptyError(self, e):
257 # type(e) == CommandException 258 return e.errorCode == 11
259 260 #--------------------------------------------------------------------------------------
261 - def _isNotASequenceError(self, e):
262 # type(e) == CommandException 263 return e.errorCode == 17
264 265 #--------------------------------------------------------------------------------------
266 - def _forcedRemoveDir(self, path, schema = None, remove_sequence = True):
267 # if directory does not exist silently exits 268 self._rep_lock.acquire(1) 269 try: 270 try: 271 self.removeDir(path) 272 except CommandException, e: 273 if self._isDirNotFoundError(e): # directory not found 274 return 275 elif self._isDirNotEmptyError(e): # directory not empty 276 # rm sequence 277 if remove_sequence: 278 try: 279 self.sequenceRemove(self._getSequenceName(path)) 280 except CommandException, e: 281 if not self._isNotASequenceError(e): # Not a sequence 282 raise e 283 284 # rm all sub dirs 285 self.listEntries(path) 286 entr_exist = False 287 while not self.eot(): 288 d, t = self.getEntry() 289 if t[0] == 'collection': 290 self._forcedRemoveDir(d, schema, remove_sequence) 291 else: 292 entr_exist = True 293 294 # rm all entries 295 if entr_exist: 296 self._removeAllEntries(path) 297 298 # rm all attributes 299 self._removeAllAttributes(path, schema) 300 301 # try remove dir (hopefully empty) again 302 try: 303 self.removeDir(path) 304 except CommandException, e: 305 if not self._isDirNotFoundError(e): #if directory is not found (other process may delete it) don't raise an exception 306 raise e 307 else: 308 raise e 309 finally: 310 self._rep_lock.release()
311 312 #--------------------------------------------------------------------------------------
313 - def _getSequenceName(self, path):
314 return os.path.join(path, self._counterName)
315 316 #--------------------------------------------------------------------------------------
317 - def _createDirIfMissing(self, path):
318 self._rep_lock.acquire(1) 319 try: 320 # creates dirs in path if do not exist 321 cwd = self.pwd() 322 if not os.path.isabs(path): 323 path = os.path.join(cwd, path) 324 dd = [path] 325 while 1: 326 d = os.path.dirname(path) 327 if d == path: 328 break 329 dd.append(d) 330 path = d 331 dd2 = [] 332 for d in dd: 333 try: 334 self.cd(d) 335 except CommandException, e: 336 logger.debug(str(e)) 337 dd2.insert(0,d) 338 else: 339 try: 340 self.cd(cwd) 341 except CommandException, e: 342 logger.debug(str(e)) 343 raise RepositoryError(e = e, msg = str(e)) 344 break 345 for d in dd2: 346 try: 347 self.createDir(d) 348 except CommandException, e: 349 if e.errorCode != 16: 350 logger.debug(str(e)) 351 logger.debug(str(d)) 352 raise RepositoryError(e = e, msg = str(e)) 353 finally: 354 self._rep_lock.release()
355 356 #--------------------------------------------------------------------------------------
357 - def _createAttrIfMissing(self, path, schema):
358 self._rep_lock.acquire(1) 359 try: 360 # check that last dir in the path supports schema 361 try: 362 attributes, types = self.listAttr(path) 363 logger.debug('attributes: ' + str(attributes)) 364 logger.debug('types: ' + str(types) + '\n') 365 # create attributes, if necessary 366 self._initCommand() 367 try: 368 for a, t in schema: 369 if a not in attributes: 370 self.addAttr(path, a, t) 371 else: 372 dbt = types[attributes.index(a)] 373 if t != dbt: 374 logger.debug("Attribute %s exists with the different type %s" % (a, dbt)) 375 finally: 376 self._finalizeCommand() 377 except CommandException, e: 378 if e.errorCode != 15: 379 logger.debug(str(e)) 380 raise RepositoryError(e = e, msg = str(e)) 381 finally: 382 self._rep_lock.release()
383 384 #--------------------------------------------------------------------------------------
385 - def _convertDbJobId(self, dbid):
386 try: 387 return int(dbid) 388 except: 389 logger.warning("Wrong DB entry %s" % dbid) 390 return dbid
391 392 #--------------------------------------------------------------------------------------
393 - def _getFQID(self, j):
394 self._rep_lock.acquire(1) 395 try: 396 fqn = [j.id] 397 while j.master: 398 j = j.master 399 fqn.insert(0, j.id) 400 return tuple(fqn) 401 finally: 402 self._rep_lock.release()
403 404 #--------------------------------------------------------------------------------------
405 - def _getJobFolderName(self, fqid):
406 self._rep_lock.acquire(1) 407 try: 408 assert(type(fqid) in [types.TupleType, types.ListType]) 409 if USE_FOLDERS_FOR_SUBJOBS: 410 pp = map(lambda x: '.'.join([str(x), "subjobs"]), fqid[:-1]) 411 pp.insert(0, self.root_dir) 412 return os.sep.join(pp) 413 else: 414 return self.root_dir 415 finally: 416 self._rep_lock.release()
417 418 #--------------------------------------------------------------------------------------
419 - def _getJobFileName(self, fqid):
420 self._rep_lock.acquire(1) 421 try: 422 assert(type(fqid) in [types.TupleType, types.ListType]) 423 if USE_FOLDERS_FOR_SUBJOBS: 424 basename = str(fqid[-1]) 425 else: 426 basename = '.'.join(map(str, fqid)) 427 return os.path.join(self._getJobFolderName(fqid), basename) 428 finally: 429 self._rep_lock.release()
430 431 #--------------------------------------------------------------------------------------
432 - def _getFQIDfromName(self, path):
433 self._rep_lock.acquire(1) 434 try: 435 if USE_FOLDERS_FOR_SUBJOBS: 436 nn = len(self.root_dir.split(os.sep)) 437 return tuple(map(lambda x: self._convertDbJobId(x.split('.')[0]), path.split(os.sep)[nn:])) 438 else: 439 return tuple(map(self._convertDbJobId, os.path.basename(path).split('.'))) 440 finally: 441 self._rep_lock.release()
442 443 #--------------------------------------------------------------------------------------
444 - def _getSubJobPath(self, fqid):
445 self._rep_lock.acquire(1) 446 try: 447 assert(type(fqid) in [types.TupleType, types.ListType]) 448 if USE_FOLDERS_FOR_SUBJOBS: 449 pp = map(lambda x: '.'.join([str(x), "subjobs"]), fqid) 450 pp.insert(0, self.root_dir) 451 return os.sep.join(pp) 452 else: 453 return '' 454 finally: 455 self._rep_lock.release()
456 457 #--------------------------------------------------------------------------------------
458 - def _getCondition(self, fqid, forced_action = False):
459 self._rep_lock.acquire(1) 460 try: 461 assert(type(fqid) in [types.TupleType, types.ListType]) 462 if forced_action: 463 condition = '' 464 else: 465 if fqid in self._job_cache: 466 guid = self._job_cache[fqid] 467 else: 468 guid = self.guid 469 path = os.path.dirname(self._getJobFileName(fqid)) 470 condition = ':'.join([path, self._lock[0] + '="%s"'%guid]) 471 return condition 472 finally: 473 self._rep_lock.release()
474 475 #--------------------------------------------------------------------------------------
476 - def _sortJobsByJobFolders(self, jobs):
477 res = {} 478 for j in jobs: 479 path = self._getJobFolderName(self._getFQID(j)) 480 if path in res: 481 res[path].append(j) 482 else: 483 res[path] = [j] 484 return res
485 486 #--------------------------------------------------------------------------------------
487 - def _text2pstr(self, text):
488 import repr 489 logger.debug('_text2pstr: %s', repr.repr(text)) 490 return binascii.unhexlify(text)
491 492 #--------------------------------------------------------------------------------------
493 - def _pstr2text(self, pstr):
494 import repr 495 logger.debug('_pstr2text: %s', repr.repr(pstr)) 496 return binascii.hexlify(pstr)
497 498 #--------------------------------------------------------------------------------------
499 - def _compress(self, v):
500 import zlib 501 return zlib.compress(v)
502 503 #--------------------------------------------------------------------------------------
504 - def _decompress(self, v):
505 import zlib 506 return zlib.decompress(v)
507 508 #--------------------------------------------------------------------------------------
509 - def _getValues(self, job, timestamp, deep = True):
510 self._rep_lock.acquire(1) 511 try: 512 def extractMD(r, mfqid): 513 fqid = list(mfqid) 514 jid = r[0]['data']['id']['data'] 515 fqid.append(jid) #fqid is fqid of current job 516 fqid = tuple(fqid) 517 res = {'fqid':fqid, 'metadata':{}} 518 for k, t in self.schema: 519 if k == self._blob[0]: 520 v = repr(r[0]) 521 if USE_COMPRESSED_BLOBS: 522 v = self._compress(v) 523 elif k == self._lock[0]: 524 v = self.guid 525 elif k == self._istate[0]: 526 v = '_' 527 elif k == self._time[0]: 528 v = timestamp 529 elif k == self._subjobs[0]: 530 if r[1]: 531 v = map(lambda rr: rr[0]['data']['id']['data'], r[1]) 532 else: 533 v = [] 534 v = repr(v) 535 elif k == self._isasubjob[0]: 536 if len(fqid) > 1: 537 v = 'Y' 538 else: 539 v = 'N' 540 elif k == self._counter[0]: 541 v = '1' # init subjob counter 542 elif k == self._compressed[0]: 543 if USE_COMPRESSED_BLOBS: 544 v = 'Y' 545 else: 546 v = 'N' 547 else: 548 v = r[0]['data'][k] 549 if v['simple']: 550 v = str(v['data']) 551 else: 552 v = v['name'] 553 if t == 'text': 554 v = self._pstr2text(v) 555 if v == '': 556 v = 'None' 557 res['metadata'][k] = v 558 if deep: 559 return (res, map(lambda sr: extractMD(sr, fqid), r[1])) 560 else: 561 return (res, [])
562 563 mfqid = self._getFQID(job)[:-1] #mfqid is fqid of master 564 jtree = Parser.extractSubJobs(self._streamer._getDictFromJob(job)) 565 return extractMD(jtree, mfqid) 566 567 finally: 568 self._rep_lock.release()
569 570 #--------------------------------------------------------------------------------------
571 - def _commitJobs(self, jobs, forced_action = False, deep = True, register = False, get_ids = True):
572 self._rep_lock.acquire(1) 573 try: 574 if register: 575 sch = self.schema 576 else: 577 sch = self._commit_schema 578 attrs = map(lambda x: x[0], sch) 579 details = {} 580 msgs = [] 581 582 def commit_visitor(md, path): 583 try: 584 fqid = md[0]['fqid'] 585 if fqid not in self._job_cache: 586 if not register: 587 msg = "Job %s is not found in job cache. Commitment for this job will normally fail." % str(fqid) 588 raise RepositoryError(msg = msg) 589 vals_dict = md[0]['metadata'] 590 vals = map(lambda x: vals_dict[x], attrs) 591 updatecond = (not register) or (deep and USE_COUNTERS_FOR_SUBJOBS and len(md[1]) > 0) 592 if updatecond: 593 self._generic_updateAttr(fqid, attrs, vals, forced_action) 594 else: 595 self._generic_addEntry(fqid, attrs, vals) 596 except (CommandException, RepositoryError), e: 597 msg = "_commitJobs() command called while committing job %s raised an exception: %s" % (str(fqid), str(e)) 598 msgs.append(msg) 599 # logger.error(msg) 600 details[fqid] = e 601 else: 602 job_cache[fqid] = self.guid 603 if deep: 604 # commit subjobs 605 sj_path = self._getSubJobPath(fqid) 606 for smd in md[1]: 607 commit_visitor(smd, sj_path)
608 609 def register_visitor(j, path, reserve): 610 # get job ids first 611 try: 612 if j.master and USE_COUNTERS_FOR_SUBJOBS: 613 def get_id(reserve): 614 return int(self._counterNext(self._getFQID(j.master), reserve)) - 1 615 else: 616 def get_id(reserve): 617 return int(self._generic_sequenceNext(self._getSequenceName(path), reserve)) - 1 618 if get_ids: 619 # getting id 620 j.id = get_id(reserve) 621 else: 622 # adjust job counter 623 while get_id(0) < j.id: 624 continue 625 except CommandException, e: 626 msg = "sequenceNext() command called while registering jobs raised an exception: %s" % str(e) 627 # logger.error(msg) 628 raise RepositoryError(e = e, msg = msg) 629 else: 630 if deep: 631 # reservation for the counter 632 sj_reserve = len(j.subjobs) 633 if sj_reserve > 0: 634 fqid = self._getFQID(j) 635 sj_path = self._getSubJobPath(fqid) 636 if USE_FOLDERS_FOR_SUBJOBS: 637 # initialize the subjobs directory first 638 self._initDir(sj_path) 639 if USE_COUNTERS_FOR_SUBJOBS: 640 # initialize counter 641 self._initCounter(fqid) 642 for sj in j.subjobs: 643 register_visitor(sj, sj_path, sj_reserve) 644 645 # sort jobs by their path 646 job_categs = self._sortJobsByJobFolders(jobs) 647 648 # loop over categories 649 for path in job_categs: 650 jobs = job_categs[path] 651 if register: 652 jobs_to_commit = [] 653 # reservation for the counter 654 j_reserve = len(jobs) 655 if j_reserve > 0: 656 #initialize the directory first 657 if path != self.root_dir: 658 self._initDir(path) 659 for j in jobs: 660 try: 661 register_visitor(j, path, j_reserve) 662 except Exception, e: 663 msg = str(e) 664 # logger.error(msg) 665 details[path] = RepositoryError(e = e, msg = msg) 666 msgs.append(msg) 667 else: 668 jobs_to_commit.append(j) 669 else: 670 jobs_to_commit = jobs 671 672 job_cache = {} 673 timestamp = repr(time.time()) 674 try: 675 self._initCommand() 676 try: 677 for j in jobs_to_commit: 678 commit_visitor(self._getValues(j, timestamp, deep), path) 679 finally: 680 self._finalizeCommand() 681 except Exception, e: 682 msg = str(e) 683 # logger.error(msg) 684 details[path] = RepositoryError(e = e, msg = msg) 685 msgs.append(msg) 686 else: 687 self._job_cache.update(job_cache) 688 689 if details: 690 raise BulkOperationRepositoryError(msg = '\n'.join(msgs), details = details) 691 692 finally: 693 self._rep_lock.release() 694 695 #--------------------------------------------------------------------------------------
696 - def _fqnConverter(self, i):
697 assert(type(i) in [types.TupleType, types.ListType, types.IntType]) 698 if type(i) == types.IntType: 699 return (i,) 700 else: 701 return i
702 703 #--------------------------------------------------------------------------------------
704 - def _getSelectionAndPath(self, selection):
705 self._rep_lock.acquire(1) 706 try: 707 assert(type(selection) == types.DictionaryType) 708 if selection.has_key('table_path'): 709 s, p = (selection['attributes'], selection['table_path']) 710 else: 711 s, p = (selection, self.root_dir) 712 if not USE_FOLDERS_FOR_SUBJOBS: 713 if not s.has_key(self._isasubjob[0]): 714 # select only top level jobs 715 s[self._isasubjob[0]] = 'N' 716 return (s,p) 717 finally: 718 self._rep_lock.release()
719 720 #--------------------------------------------------------------------------------------
721 - def _getUpdateExpr(self, name, value):
722 value = value.replace('\'', '\\\'') 723 return '%s \'"%s"\''%(name, value)
724 725 #--------------------------------------------------------------------------------------
726 - def _initBulkGetAttr(self):
727 pass
728 729 #--------------------------------------------------------------------------------------
730 - def _finalizeBulkGetAttr(self):
731 pass
732 733 #--------------------------------------------------------------------------------------
734 - def _initBulkRm(self):
735 pass
736 737 #--------------------------------------------------------------------------------------
738 - def _finalizeBulkRm(self):
739 pass
740 741 #--------------------------------------------------------------------------------------
742 - def _getExtendedAttrList(self, attr_list):
743 lock = self._lock[0] 744 istate = self._istate[0] 745 lock_time = self._time[0] 746 747 extended_attr_list = attr_list[:] 748 extra_attr = [lock, istate, lock_time] 749 for a in extra_attr: 750 if a not in extended_attr_list: 751 extended_attr_list.append(a) 752 return (extended_attr_list, map(lambda a: extended_attr_list.index(a)+1, extra_attr))
753 754 #--------------------------------------------------------------------------------------
755 - def _getMetaData(self, ids_or_attributes, attr_list):
756 """ids_or_attributes is used to make a selection in the DB 757 attr_list is used to specify return value. 758 Note, the first item of every list in the returned list is always job fqid. 759 """ 760 self._rep_lock.acquire(1) 761 try: 762 md_list = [] 763 (extended_attr_list, 764 (lock_index, istate_index, lock_time_index)) = self._getExtendedAttrList(attr_list) 765 766 def update_cache(fqid): 767 # update job cache 768 job_lock = md[lock_index] 769 if fqid in self._job_cache: 770 if self._job_cache[fqid]!= job_lock: 771 logger.warning('Job %s has been modified outside of current session' % self._getJobFileName(fqid)) 772 self._job_cache[fqid] = job_lock
773 774 if type(ids_or_attributes) in [types.TupleType, types.ListType]: 775 self._initBulkGetAttr() 776 for jid in ids_or_attributes: 777 fqid = self._fqnConverter(jid) 778 fqn = self._getJobFileName(fqid) 779 try: 780 self._generic_getattr(fqid, extended_attr_list) 781 except CommandException, e: 782 msg = "ARDA interface command getattr() called for job %s raised an exception: %s" % (str(fqid), str(e)) 783 logger.error(msg) 784 else: 785 while not self._generic_eot(): 786 try: 787 f, md = self._generic_getEntry() 788 assert(os.path.basename(fqn) == f) 789 except Exception, e: 790 msg = "ARDA interface command getEntry() called for job %s raised an exception: %s" % (str(fqid), str(e)) 791 logger.error(msg) 792 else: 793 md.insert(0, fqid) 794 md_list.append(md) 795 update_cache(fqid) 796 self._finalizeBulkGetAttr() 797 else: 798 selection, path = self._getSelectionAndPath(ids_or_attributes) 799 try: 800 self._generic_selectAttr(selection, path, extended_attr_list) 801 except CommandException, e: 802 msg = "ARDA interface command selectAttr() raised an exception: %s" % str(e) 803 logger.error(msg) 804 # raise RepositoryError(e = e, msg = msg) 805 else: 806 while not self._generic_eot(): 807 try: 808 md = self._generic_getSelectAttrEntry() 809 except Exception, e: 810 msg = "ARDA interface command getSelectAttrEntry() raised an exception: %s" % str(e) 811 logger.error(msg) 812 else: 813 f = os.path.join(path, md[0]) 814 fqid = self._getFQIDfromName(f) 815 md[0] = fqid 816 md_list.append(md) 817 update_cache(fqid) 818 return md_list 819 finally: 820 self._rep_lock.release() 821 822 #--------------------------------------------------------------------------------------
823 - def _getLockedMetaData(self, ids, attr_list, istate = '_', forced_action = False):
824 """The same function as _getMetaData, but it first tries to lock data using on istate. 825 It returns non empty list only for the entries that has been actually locked. It only 826 accepts list of fqids as ids parameter. 827 The idea is that it checks lock and retrieves metadata within one call. 828 """ 829 self._rep_lock.acquire(1) 830 try: 831 (extended_attr_list, 832 (lock_index, istate_index, lock_time_index)) = self._getExtendedAttrList(attr_list) 833 self._setLock(ids, istate, forced_action) 834 md_list = self._getMetaData(ids, attr_list) 835 for i in range(len(md_list)-1, -1, -1): 836 md = md_list[i] 837 if md[lock_index] == self.guid and md[istate_index] == istate: 838 continue 839 else: 840 del md_list[i] 841 return md_list 842 finally: 843 self._rep_lock.release()
844 845 #--------------------------------------------------------------------------------------
846 - def _setLock(self, ids, istate, forced_action = False):
847 attrs = [self._lock[0], self._istate[0], self._time[0]] 848 values = [self.guid, istate, repr(time.time())] 849 try: 850 self._setMetaData(ids, attrs, values, forced_action) 851 except BulkOperationRepositoryError, e: 852 logger.debug(str(e))
853 854 #--------------------------------------------------------------------------------------
855 - def _setMetaData(self, ids, attrs, values, forced_action = False, new = False):
856 self._rep_lock.acquire(1) 857 try: 858 details = {} 859 msgs = [] 860 update = (self._lock[0] in attrs) 861 self._initCommand() 862 try: 863 for jid in ids: 864 fqid = self._fqnConverter(jid) 865 try: 866 if new: 867 self._generic_addEntry(fqid, attrs, values) 868 else: 869 self._generic_updateAttr(fqid, attrs, values, forced_action) 870 except Exception, e: 871 msg = str(e) 872 # logger.error(msg) 873 details[fqid] = RepositoryError(e = e, msg = msg) 874 msgs.append(msg) 875 else: 876 if update: 877 self._job_cache[fqid] = self.guid 878 finally: 879 self._finalizeCommand() 880 881 if details: 882 raise BulkOperationRepositoryError(msg = '\n'.join(msgs), details = details) 883 finally: 884 self._rep_lock.release()
885 886 #--------------------------------------------------------------------------------------
887 - def _initCounter(self, fqid):
888 self._rep_lock.acquire(1) 889 try: 890 attrs = [self._counter[0], self._lock[0], self._istate[0], self._time[0]] 891 values = ['1', self.guid, '_', repr(time.time())] 892 self._setMetaData([fqid], attrs, values, new = True) 893 finally: 894 self._rep_lock.release()
895 896 #--------------------------------------------------------------------------------------
897 - def _counterNext(self, fqid, reserve = 1):
898 # with reserve > 1 this method returns 899 # reserved value for the counter if there is one available, 900 # otherwise it will reserve specified number of values and return the first one 901 self._rep_lock.acquire(1) 902 try: 903 def next(fqid, reserve): 904 metadata = self._getLockedMetaData([fqid], [self._counter[0]], istate = '_counter') 905 if metadata: 906 md = metadata[0] 907 assert(md[0] == fqid) 908 now = md[1] 909 newval = str(int(now) + reserve) 910 attrs = [self._counter[0], self._lock[0], self._istate[0], self._time[0]] 911 values = [newval, self.guid, '_', repr(time.time())] 912 self._setMetaData([fqid], attrs, values) 913 return (now, newval) 914 else: 915 raise RepositoryError(msg = 'Can not lock job %s' % str(fqid))
916 917 # use reserved values if possible (don't read the table) 918 if reserve > 1: 919 if not hasattr(self, 'sjid_reserve'): 920 self.sjid_reserve = {} 921 try: 922 if fqid in self.sjid_reserve: 923 now, reserved = self.sjid_reserve[fqid] 924 else: 925 now, reserved = map(int, next(fqid, reserve)) 926 self.sjid_reserve[fqid] = [now, reserved] 927 newval = now + 1 928 if newval >= reserved: 929 del self.sjid_reserve[fqid] 930 else: 931 self.sjid_reserve[fqid][0] = newval 932 return str(now) 933 except Exception, e: 934 raise RepositoryError(e = e, msg = "getNextSubjobId error: " + str(e)) 935 else: 936 return next(fqid, reserve)[0] 937 finally: 938 self._rep_lock.release() 939 940 #--------------------------------------------------------------------------------------
941 - def _initCommand(self):
942 try: 943 self.transaction() 944 except Exception, e: 945 msg = str(e) 946 logger.error(msg) 947 raise RepositoryError(e = e, msg = msg)
948 949 #--------------------------------------------------------------------------------------
950 - def _finalizeCommand(self):
951 try: 952 try: 953 self.commit() 954 except CommandException, e: 955 if e.errorCode != 9: 956 raise e 957 except Exception, e: 958 msg = str(e) 959 logger.error(msg) 960 raise RepositoryError(e = e, msg = msg)
961 962 #--------------------------------------------------------------------------------------
963 - def _generic_addEntry(self, fqid, attrs, values):
964 self.addEntry(self._getJobFileName(fqid), attrs, values)
965 966 #--------------------------------------------------------------------------------------
967 - def _generic_updateAttr(self, fqid, attrs, values, forced_action):
968 updateExpr = map(self._getUpdateExpr, attrs, values) 969 condition = self._getCondition(fqid, forced_action) 970 self.updateAttr(self._getJobFileName(fqid), updateExpr, condition)
971 972 #--------------------------------------------------------------------------------------
973 - def _generic_eot(self):
974 return self.eot()
975 976 #--------------------------------------------------------------------------------------
977 - def _generic_getattr(self, fqid, attr_list):
978 self.getattr(self._getJobFileName(fqid), attr_list)
979 980 #--------------------------------------------------------------------------------------
981 - def _generic_getEntry(self):
982 return self.getEntry()
983 984 #--------------------------------------------------------------------------------------
985 - def _generic_selectAttr(self, selection, path, attr_list):
986 # always return filename as the first attribute 987 query = ' and '.join(map(lambda x: ':'.join([path, '%s="%s"'%x]), selection.items())) 988 attr = map(lambda x: ':'.join([path, x]), attr_list) 989 attr.insert(0, ':'.join([path, 'FILE'])) 990 self.selectAttr(attr, query)
991 992 #--------------------------------------------------------------------------------------
993 - def _generic_getSelectAttrEntry(self):
994 return self.getSelectAttrEntry()
995 996 #--------------------------------------------------------------------------------------
997 - def _generic_rm(self, fqid, forced_action):
998 condition = self._getCondition(fqid, forced_action) 999 # self.rm(self._getJobFileName(fqid), condition) ##Is it supported? 1000 self.rm(self._getJobFileName(fqid))
1001 1002 #--------------------------------------------------------------------------------------
1003 - def _generic_sequenceNext(self, name, reserve = 0):
1004 return self.sequenceNext(name)
1005 1006 #--------------------------------------------------------------------------------------
1007 - def registerJobs(self, jobs, masterJob = None):
1008 self._rep_lock.acquire(1) 1009 try: 1010 if masterJob: 1011 for j in jobs: 1012 if j.master: 1013 assert(j.master is masterJob) 1014 else: 1015 j._setParent(masterJob) 1016 self._commitJobs(jobs, forced_action = True, register = True) 1017 self._commitJobs([masterJob], deep = False) 1018 else: 1019 self._commitJobs(jobs, forced_action = True, register = True) 1020 finally: 1021 self._rep_lock.release()
1022 1023 #--------------------------------------------------------------------------------------
1024 - def commitJobs(self, jobs, forced_action = False, deep = True):
1025 # all directories must be initialized 1026 self._commitJobs(jobs, forced_action, deep)
1027 1028 #--------------------------------------------------------------------------------------
1029 - def checkoutJobs(self, ids_or_attributes, deep = True):
1030 self._rep_lock.acquire(1) 1031 try: 1032 jobs = [] 1033 attr_list = [self._subjobs[0], self._compressed[0], self._blob[0]] 1034 1035 def sorter(x, y): 1036 idx, idy = map(lambda x: x[0]['data']['id']['data'], (x,y)) 1037 if idx > idy: 1038 return 1 1039 elif idx < idy: 1040 return -1 1041 else: 1042 return 0
1043 1044 def visitor(md): 1045 try: 1046 sjobs = eval(self._text2pstr(md[1])) 1047 pstr = self._text2pstr(md[3]) 1048 if md[2] == 'Y': 1049 pstr = self._decompress(pstr) 1050 jdict = eval(pstr) 1051 rr = [] 1052 if deep and sjobs: 1053 if USE_FOLDERS_FOR_SUBJOBS: 1054 path = self._getSubJobPath(md[0]) 1055 dd = {'table_path':path, 'attributes':{}} 1056 else: 1057 dd = [] 1058 mfqid = list(md[0]) 1059 for s in sjobs: 1060 sfqid = mfqid[:] 1061 sfqid.append(s) 1062 dd.append(tuple(sfqid)) 1063 metadata = self._getMetaData(dd, attr_list) 1064 for md in metadata: 1065 rr.append(visitor(md)) 1066 if USE_FOLDERS_FOR_SUBJOBS: 1067 rr.sort(sorter) 1068 except Exception, e: 1069 msg = 'Dictionary of job %s cannot be evaluated because of the error: %s. The job is most likely corrupted and will not be not imported.' % (str(e), str(md[0])) 1070 raise RepositoryError(e = e, msg = msg) 1071 else: 1072 return [jdict, rr] 1073 1074 metadata = self._getMetaData(ids_or_attributes, attr_list) 1075 for md in metadata: 1076 try: 1077 jdict = Parser.insertSubJobs(visitor(md)) 1078 job = self._streamer._getJobFromDict(jdict) 1079 except Exception, e: 1080 msg = 'Exception: %s while constructing job object from a dictionary' % (str(e)) 1081 logger.error(msg) 1082 else: 1083 jobs.append(job) 1084 return jobs 1085 finally: 1086 self._rep_lock.release() 1087 1088 #--------------------------------------------------------------------------------------
1089 - def deleteJobs(self, ids, forced_action = False):
1090 self._rep_lock.acquire(1) 1091 try: 1092 details = {} 1093 msgs = [] 1094 1095 def getSubjobs(ids, deep = True): 1096 # if deep = True the format of the return value is [ [fqid, [ [sfqid,[...]], [...], ...] ],...] 1097 # if deep = False the format of the return value is [ [fqid, [ sfqid, ...] ],...] 1098 sjlist = self._getLockedMetaData(ids, [self._subjobs[0]], '_deleting', forced_action) 1099 # sjlist = self._getMetaData(ids, [self._subjobs[0]]) 1100 sj_len = len(sjlist) 1101 ids_len = len(ids) 1102 if sj_len != ids_len: 1103 # check for errors 1104 for i in range(ids_len): 1105 jid = ids[i] 1106 fqid = self._fqnConverter(jid) 1107 bp = min(i, sj_len) 1108 for k in range(bp, sj_len) + range(0, bp): 1109 if len(sjlist[k]) > 0: 1110 if fqid == sjlist[k][0]: 1111 break 1112 else: 1113 msg = "Can not remove job %s" % str(fqid) 1114 details[fqid] = RepositoryError(msg = msg) 1115 msgs.append(msg) 1116 for md in sjlist: 1117 del md[2:] 1118 sjobs = eval(self._text2pstr(md[1])) 1119 mfqid = list(md[0]) 1120 for i in range(len(sjobs)): 1121 sfqid = mfqid[:] 1122 sfqid.append(sjobs[i]) 1123 sjobs[i] = tuple(sfqid) 1124 if deep: 1125 md[1] = getSubjobs(sjobs, deep) 1126 else: 1127 md[1] = sjobs 1128 return sjlist
1129 1130 def rm(sjlist): 1131 deleted_jobs = [] 1132 for (fqid, ssjlist) in sjlist: 1133 try: 1134 self._generic_rm(fqid, forced_action) 1135 except (RepositoryError, CommandException), e: 1136 msg = "deleteJobs() command called while deleting job %s raised an exception: %s" % (str(fqid), str(e)) 1137 details[fqid] = RepositoryError(e = e, msg = msg) 1138 msgs.append(msg) 1139 else: 1140 # remove job id from local cache 1141 if fqid in self._job_cache: 1142 del self._job_cache[fqid] 1143 # indicate that subjobs have to be deleted 1144 deleted_jobs.append([fqid, rm(ssjlist)]) 1145 return deleted_jobs 1146 1147 sjlist = getSubjobs(ids) 1148 1149 # loop over jobs 1150 self._initBulkRm() 1151 try: 1152 deleted_jobs = rm(sjlist) 1153 finally: 1154 self._finalizeBulkRm() 1155 1156 # proceed with deleted jobs 1157 if USE_FOLDERS_FOR_SUBJOBS: 1158 # remove subjob folder (if any) 1159 try: 1160 self._forcedRemoveDir(self._getSubJobPath(fqid)) 1161 except Exception, e: 1162 msg = "Can not remove subjobs folder of the job %s because of the error: %s" % (str(fqid), str(e)) 1163 # logger.error(msg) 1164 details[fqid] = RepositoryError(e = e, msg = msg) 1165 msgs.append(msg) 1166 1167 # update metadata of the master jobs 1168 mfqids = {} # dictionary keys is master job fqids for which list of subjobs has to be updated 1169 for (fqid, ssjlist) in deleted_jobs: 1170 mfqid = fqid[:-1] 1171 if mfqid: 1172 if not mfqid in mfqids: 1173 mfqids[mfqid] = [] 1174 mfqids[mfqid].append(fqid) 1175 1176 sjlist = getSubjobs(mfqids.keys(), deep = False) # list of master jobs and all subjobs (to be updated) 1177 # updating ... 1178 self._initCommand() 1179 try: 1180 attrs_r = [self._subjobs[0], self._lock[0], self._istate[0], self._time[0], self._counter[0]] 1181 values_r = ['', self.guid, '_', repr(time.time()), '1'] 1182 attrs_s = attrs_r[:-1] 1183 values_s = values_r[:-1] 1184 for (mfqid, ssjlist) in sjlist: 1185 try: 1186 for fqid in mfqids[mfqid]: 1187 ssjlist.remove(fqid) 1188 for i in range(len(ssjlist)): 1189 ssjlist[i] = ssjlist[i][-1] # reuse the same list to reduce memory consumption 1190 if len(ssjlist) == 0: 1191 # reset counter 1192 attrs = attrs_r 1193 values = values_r 1194 else: 1195 attrs = attrs_s 1196 values = values_s 1197 values[0] = self._pstr2text(repr(ssjlist)) # subjob ids 1198 self._generic_updateAttr(mfqid, attrs, values, forced_action) 1199 except Exception, e: 1200 msg = "Can not update master job %s of subjob %s because of the error: %s" % (str(mfqid), str(fqid), str(e)) 1201 # logger.error(msg) 1202 details[fqid] = RepositoryError(e = e, msg = msg) 1203 msgs.append(msg) 1204 finally: 1205 self._finalizeCommand() 1206 1207 if USE_FOLDERS_FOR_SUBJOBS: 1208 for (mfqid, ssjlist) in sjlist: 1209 try: 1210 if len(ssjlist) == 0: 1211 # remove subjob folder (if any) 1212 self._forcedRemoveDir(self._getSubJobPath(mfqid)) 1213 except Exception, e: 1214 msg = "Can not unsplit master job %s after deleting the last subjob %s because of the error: %s" % (str(mfqid), str(fqid), str(e)) 1215 # logger.error(msg) 1216 details[fqid] = RepositoryError(e = e, msg = msg) 1217 msgs.append(msg) 1218 1219 if details: 1220 raise BulkOperationRepositoryError(msg = '\n'.join(msgs), details = details) 1221 finally: 1222 self._rep_lock.release() 1223 1224 #--------------------------------------------------------------------------------------
1225 - def getJobIds(self, ids_or_attributes):
1226 self._rep_lock.acquire(1) 1227 try: 1228 try: 1229 metadata = self._getMetaData(ids_or_attributes, ['id']) 1230 except RepositoryError, e: 1231 msg = 'Exception: %s while getting job ids.' % str(e) 1232 logger.error(msg) 1233 raise e 1234 return map(lambda x: x[0], metadata) 1235 finally: 1236 self._rep_lock.release()
1237 1238 #--------------------------------------------------------------------------------------
1239 - def getJobAttributes(self, ids_or_attributes):
1240 self._rep_lock.acquire(1) 1241 try: 1242 mdlist = [] 1243 attr_list = self._commit_schema[:] 1244 attr_list.remove(self._blob) # no blob 1245 attr_list.remove(self._lock) # no lock 1246 attr_list = map(lambda x: x[0], attr_list) 1247 try: 1248 metadata = self._getMetaData(ids_or_attributes, attr_list) 1249 except RepositoryError, e: 1250 msg = 'Exception: %s while getting job attributes.' % str(e) 1251 logger.error(msg) 1252 raise e 1253 for md in metadata: 1254 mdd = {} 1255 for a, v in zip(attr_list, md[1:]): 1256 if a == self._subjobs[0]: 1257 v = eval(self._text2pstr(v)) 1258 mdd[a] = v 1259 mdlist.append(mdd) 1260 return mdlist 1261 finally: 1262 self._rep_lock.release()
1263 1264 #--------------------------------------------------------------------------------------
1265 - def setJobsStatus(self, statusList, forced_action = False):
1266 self._rep_lock.acquire(1) 1267 try: 1268 details = {} 1269 msgs = [] 1270 attr_list = [self._compressed[0], self._blob[0]] 1271 attrs = ['status', self._compressed[0], self._blob[0], self._lock[0], self._istate[0], self._time[0]] 1272 1273 statusList = map(lambda x: (self._fqnConverter(x[0]), x[1]), statusList) 1274 1275 # retrieve all jobs 1276 fqids = map(lambda x: x[0], statusList) 1277 metadata = self._getMetaData(fqids, attr_list) 1278 mdids = map(lambda x: x[0], metadata) 1279 1280 istate = '_' 1281 timestamp = repr(time.time()) 1282 1283 self._initCommand() 1284 try: 1285 for fqid, status in statusList: 1286 if fqid in mdids: 1287 md = metadata[mdids.index(fqid)] 1288 pstr = self._text2pstr(md[2]) 1289 if md[1] == 'Y': 1290 pstr = self._decompress(pstr) 1291 jdict = eval(pstr) 1292 jdict['data']['status']['data'] = status 1293 pstr = repr(jdict) 1294 if USE_COMPRESSED_BLOBS: 1295 compressed = 'Y' 1296 pstr = self._compress(pstr) 1297 else: 1298 compressed = 'N' 1299 blob = self._pstr2text(pstr) 1300 vals = [status, compressed, blob, self.guid, istate, timestamp] 1301 try: 1302 self._generic_updateAttr(fqid, attrs, vals, forced_action) 1303 except (CommandException, RepositoryError), e: 1304 msg = "setJobsStatus() command called while committing job %s raised an exception: %s" % (str(fqid), str(e)) 1305 msgs.append(msg) 1306 # logger.error(msg) 1307 details[fqid] = RepositoryError(e = e, msg = msg) 1308 else: 1309 self._job_cache[fqid] = self.guid 1310 else: 1311 msg = "Job %s is not found in the repository" % str(fqid) 1312 msgs.append(msg) 1313 # logger.error(msg) 1314 details[fqid] = RepositoryError(msg = msg) 1315 if details: 1316 raise BulkOperationRepositoryError(msg = '\n'.join(msgs), details = details) 1317 finally: 1318 self._finalizeCommand() 1319 finally: 1320 self._rep_lock.release()
1321 1322 #--------------------------------------------------------------------------------------
1323 - def getJobsStatus(self, ids_or_attributes):
1324 self._rep_lock.acquire(1) 1325 try: 1326 attr_list = ['status'] 1327 try: 1328 metadata = self._getMetaData(ids_or_attributes, attr_list) 1329 except RepositoryError, e: 1330 msg = 'Exception: %s while getting job status.' % str(e) 1331 logger.error(msg) 1332 raise e 1333 return metadata 1334 finally: 1335 self._rep_lock.release()
1336 1337 #--------------------------------------------------------------------------------------
1338 - def getJobTree(self, tree_id = 0):
1339 self._rep_lock.acquire(1) 1340 try: 1341 md_list = [] 1342 jtreefldr = os.path.join(self.root_dir, self._jobsTreeFldr) 1343 attrs = [self._jobsTreeAttr[0]] 1344 fn = os.path.join(jtreefldr, str(tree_id)) 1345 try: 1346 self.getattr(fn, attrs) 1347 except CommandException, e: 1348 #logger.warning(str(e)) 1349 logger.debug(str(e)) 1350 return 1351 while not self.eot(): 1352 try: 1353 file, values = self.getEntry() 1354 except Exception, e: 1355 logger.error(str(e)) 1356 else: 1357 md_list.append(values[0]) 1358 if self._tree_streamer: 1359 if md_list: 1360 return self._tree_streamer.getTreeFromStream(self._text2pstr(md_list[0])) 1361 else: 1362 logger.warning("JobTree streamer has not been set.") 1363 logger.warning("jobtree object can not be retrieved from repository") 1364 finally: 1365 self._rep_lock.release()
1366 1367 #--------------------------------------------------------------------------------------
1368 - def setJobTree(self, jobtree, tree_id = 0):
1369 self._rep_lock.acquire(1) 1370 try: 1371 jtreefldr = os.path.join(self.root_dir, self._jobsTreeFldr) 1372 attrs = [self._jobsTreeAttr[0]] 1373 fn = os.path.join(jtreefldr, str(tree_id)) 1374 if self._tree_streamer: 1375 val = self._tree_streamer.getStreamFromTree(jobtree) 1376 values = [self._pstr2text(val)] 1377 else: 1378 logger.warning("JobTree streamer has not been set.") 1379 logger.warning("jobtree object can not be saved in the repository") 1380 return 1381 try: 1382 self.listEntries(jtreefldr) 1383 while not self.eot(): 1384 try: 1385 file, type = self.getEntry() 1386 except Exception, e: 1387 logger.error(str(e)) 1388 else: 1389 if os.path.basename(file) == os.path.basename(fn): 1390 self.setAttr(fn, attrs, values) 1391 break 1392 else: 1393 self.addEntry(fn, attrs, values) 1394 except CommandException, e: 1395 logger.debug(str(e)) 1396 raise RepositoryError(e = e, msg = str(e)) 1397 finally: 1398 self._rep_lock.release()
1399 1400 #--------------------------------------------------------------------------------------
1401 - def resetAll(self):
1402 """Replaces root directory and all its content with fresh empty initialized directory. 1403 """ 1404 self._rep_lock.acquire(1) 1405 try: 1406 try: 1407 # special treatment for job tree 1408 jtreefldr = os.path.join(self.root_dir, self._jobsTreeFldr) 1409 self._forcedRemoveDir(jtreefldr, [self._jobsTreeAttr], remove_sequence = False) 1410 # remove root_dir 1411 self._forcedRemoveDir(self.root_dir) 1412 # init root dir again 1413 self._initAll() 1414 except Exception, e: 1415 raise RepositoryError(e = e, msg = str(e)) 1416 finally: 1417 self._rep_lock.release()
1418 1419 1420 ###########################################################################################
1421 -class RemoteARDAJobRepository(ARDARepositoryMixIn, MDClient):
1422 1423 #--------------------------------------------------------------------------------------
1424 - def __init__(self, 1425 schema, 1426 role, 1427 streamer, 1428 tree_streamer, 1429 root_dir, 1430 host = 'gangamd.cern.ch', 1431 port = 8822, 1432 login = 'user', 1433 password = 'ganga', 1434 reqSSL = True, 1435 keepalive = True, 1436 init_schema = True, 1437 **kwds):
1438 MDClient.__init__(self, 1439 host = host, 1440 port = port, 1441 login = login, 1442 password = password, 1443 keepalive = keepalive) 1444 1445 if reqSSL: 1446 fn = self._getGridProxy() 1447 key = kwds.get('key') 1448 if not key: 1449 key = fn 1450 cert = kwds.get('cert') 1451 if not cert: 1452 cert = fn 1453 1454 MDClient.requireSSL(self, key, cert) 1455 try: 1456 MDClient.connect(self) 1457 except Exception, e: 1458 msg = "Can not connect to the Repository because of the error: %s" % str(e) 1459 logger.error(msg) 1460 raise RepositoryError(e = e, msg = msg) 1461 1462 ARDARepositoryMixIn.__init__(self, schema, role, 1463 streamer, 1464 tree_streamer, 1465 root_dir, 1466 init_schema)
1467 1468 #--------------------------------------------------------------------------------------
1469 - def _getGridProxy(self):
1470 import Ganga.GPIDev.Credentials 1471 gp = Ganga.GPIDev.Credentials.getCredential("GridProxy") 1472 try: 1473 if not gp.isValid(): 1474 gp.create() 1475 fn = gp.location() 1476 except Exception, e: 1477 msg = 'Exception: %s while getting proxy location' % str(e) 1478 logger.error(msg) 1479 fn = '' 1480 return fn
1481 1482 #--------------------------------------------------------------------------------------
1483 - def _isDirNotFoundError(self, e):
1484 # FIX to be done by Birger: 1485 # remote repository raises error no 11 if directory is not found 1486 # but it should raise erron no 1 instead 1487 # type(e) == CommandException 1488 if e.errorCode == 1: 1489 return True 1490 elif e.errorCode == 11 and e.msg.startswith('Not a directory'): 1491 return True 1492 return False
1493 1494 #--------------------------------------------------------------------------------------
1495 - def _isNotASequenceError(self, e):
1496 # type(e) == CommandException 1497 return e.errorCode == 17 or e.errorCode == 1
1498 1499 #--------------------------------------------------------------------------------------
1500 - def removeAllLocks(self):
1501 logger.error("method removeAllLocks should not be called for remote registry")
1502 1503 #--------------------------------------------------------------------------------------
1504 - def releaseAllLocks(self):
1505 #logger.error("method releaseAllLocks should not be called for remote registry") 1506 pass
1507 1508 #--------------------------------------------------------------------------------------
1509 - def listAllLocks(self):
1510 logger.error("method listAllLocks should not be called for remote registry") 1511 return []
1512 1513 1514 ###########################################################################################
1515 -class RemoteOracleARDAJobRepository(RemoteARDAJobRepository):
1516 1517 _blobsFldr = 'blobs' 1518 _blobsFldrAttr = ('jobBlob', 'text') 1519 1520 ## Blobs are stored in a different table ('blobs'), and are refereced in the 1521 ## main table using blob ids. 1522 ## On commit new blob is added to the blob table. Old blobs stays in the blob 1523 ## table till the clean-up procedure takes place 1524 1525 #--------------------------------------------------------------------------------------
1526 - def __init__(self, 1527 schema, 1528 role, 1529 streamer, 1530 tree_streamer, 1531 root_dir, 1532 host = 'gangamd.cern.ch', 1533 port = 8822, 1534 login = 'user', 1535 password = 'ganga', 1536 reqSSL = True, 1537 keepalive = True, 1538 init_schema = True, 1539 **kwds):
1540 RemoteARDAJobRepository.__init__(self, 1541 schema, 1542 role, 1543 streamer, 1544 tree_streamer, 1545 root_dir, 1546 host, 1547 port, 1548 login, 1549 password, 1550 reqSSL, 1551 keepalive, 1552 init_schema, 1553 **kwds)
1554 1555 1556 #--------------------------------------------------------------------------------------
1557 - def _isNotASequenceError(self, e):
1558 # type(e) == CommandException 1559 return e.errorCode == 11
1560 1561 #--------------------------------------------------------------------------------------
1562 - def _removeAllAttributes(self, path, schema):
1563 # removes all attributes from the directory "path" 1564 self._rep_lock.acquire(1) 1565 try: 1566 if not schema: 1567 schema = self.schema 1568 self._initCommand() 1569 try: 1570 try: 1571 map(lambda x: self.removeAttr(path, x[0]), schema) 1572 except CommandException,e: 1573 if e.errorCode == 9: #TODO: Oracle backend error 1574 pass 1575 finally: 1576 self._finalizeCommand() 1577 finally: 1578 self._rep_lock.release()
1579 1580 #--------------------------------------------------------------------------------------
1581 - def _getBlobsFldrName(self, path):
1582 return os.path.join(path, self._blobsFldr)
1583 1584 #--------------------------------------------------------------------------------------
1585 - def _getBlobFileName(self, fqid, guid):
1586 path = self._getBlobsFldrName(self._getJobFolderName(fqid)) 1587 basename = map(str, fqid) 1588 basename.append(guid) 1589 fn = '.'.join(basename) 1590 return os.path.join(path, fn)
1591 1592 #--------------------------------------------------------------------------------------
1593 - def _initDir(self, path, schema = None, create_sequence = True):
1594 self._rep_lock.acquire(1) 1595 try: 1596 # check if the main dir exists, if not than create it 1597 RemoteARDAJobRepository._initDir(self, path, schema, create_sequence) 1598 1599 # init blobs directory 1600 RemoteARDAJobRepository._initDir(self, 1601 self._getBlobsFldrName(path), 1602 schema = [self._blobsFldrAttr], 1603 create_sequence = False) 1604 finally: 1605 self._rep_lock.release()
1606 1607 #--------------------------------------------------------------------------------------
1608 - def _forcedRemoveDir(self, path, schema = None, remove_sequence = True):
1609 # if directory does not exist silently exits 1610 self._rep_lock.acquire(1) 1611 try: 1612 # remove blobs folder first 1613 RemoteARDAJobRepository._forcedRemoveDir(self, 1614 self._getBlobsFldrName(path), 1615 schema = [self._blobsFldrAttr], 1616 remove_sequence = False) 1617 # remove all the rest 1618 RemoteARDAJobRepository._forcedRemoveDir(self, path, schema, remove_sequence) 1619 finally: 1620 self._rep_lock.release()
1621 1622 #--------------------------------------------------------------------------------------
1623 - def _addBlob(self, fqid, attrs, values):
1624 if not self._blob[0] in attrs: 1625 return 1626 blob_index = attrs.index(self._blob[0]) 1627 blob = values[blob_index] 1628 values[blob_index] = newGuid(values) 1629 # add blob 1630 blob_fn = self._getBlobFileName(fqid, values[blob_index]) 1631 self.addEntry(blob_fn, [self._blobsFldrAttr[0]], [blob])
1632 1633 #--------------------------------------------------------------------------------------
1634 - def _generic_addEntry(self, fqid, attrs, values):
1635 values = values[:] # values will be changed by _addBlob 1636 # add blob first 1637 self._addBlob(fqid, attrs, values) 1638 # add all other metadata 1639 RemoteARDAJobRepository._generic_addEntry(self, fqid, attrs, values)
1640 1641 #--------------------------------------------------------------------------------------
1642 - def _generic_updateAttr(self, fqid, attrs, values, forced_action):
1643 values = values[:] # values will be changed by _addBlob 1644 # add blob first 1645 self._addBlob(fqid, attrs, values) 1646 # add all other metadata 1647 RemoteARDAJobRepository._generic_updateAttr(self, fqid, attrs, values, forced_action)
1648 1649 #--------------------------------------------------------------------------------------
1650 - def _generic_eot(self):
1651 return not len(self._command_buffer)
1652 1653 #--------------------------------------------------------------------------------------
1654 - def _generic_getattr(self, fqid, attr_list):
1655 command_buffer = [] 1656 RemoteARDAJobRepository._generic_getattr(self, fqid, attr_list) 1657 while not self.eot(): 1658 command_buffer.append(self.getEntry()) 1659 if self._blob[0] in attr_list: 1660 self._command_buffer = [] 1661 blob_index = attr_list.index(self._blob[0]) 1662 for (f, md) in command_buffer: 1663 blob_fn = self._getBlobFileName(fqid, md[blob_index]) 1664 try: 1665 self.getattr(blob_fn, [self._blobsFldrAttr[0]]) 1666 except CommandException, e: 1667 msg = "ARDA interface command getattr() called for job %s raised an exception: %s" % (str(fqid), str(e)) 1668 logger.error(msg) 1669 else: 1670 while not self.eot(): 1671 fn, blob_md = self.getEntry() 1672 assert(fn == os.path.basename(blob_fn)) 1673 md[blob_index] = blob_md[0] 1674 self._command_buffer.append((f, md)) 1675 else: 1676 self._command_buffer = command_buffer
1677 1678 #--------------------------------------------------------------------------------------
1679 - def _generic_getEntry(self):
1680 return self._command_buffer.pop(0)
1681 1682 #--------------------------------------------------------------------------------------
1683 - def _generic_selectAttr(self, selection, path, attr_list):
1684 # always return filename as the first attribute 1685 command_buffer = [] 1686 RemoteARDAJobRepository._generic_selectAttr(self, selection, path, attr_list) 1687 while not self.eot(): 1688 command_buffer.append(self.getSelectAttrEntry()) 1689 if self._blob[0] in attr_list: 1690 self._command_buffer = [] 1691 blob_index = attr_list.index(self._blob[0]) + 1 1692 for md in command_buffer: 1693 f = os.path.join(path, md[0]) 1694 fqid = self._getFQIDfromName(f) 1695 blob_fn = self._getBlobFileName(fqid, md[blob_index]) 1696 try: 1697 self.getattr(blob_fn, [self._blobsFldrAttr[0]]) 1698 except CommandException, e: 1699 msg = "ARDA interface command getattr() called for job %s raised an exception: %s" % (str(fqid), str(e)) 1700 logger.error(msg) 1701 else: 1702 while not self.eot(): 1703 fn, blob_md = self.getEntry() 1704 assert(fn == os.path.basename(blob_fn)) 1705 md[blob_index] = blob_md[0] 1706 self._command_buffer.append(md) 1707 else: 1708 self._command_buffer = command_buffer
1709 1710 #--------------------------------------------------------------------------------------
1711 - def _generic_getSelectAttrEntry(self):
1712 # md = self._command_buffer.pop(0) 1713 return self._command_buffer.pop(0)
1714 1715 #--------------------------------------------------------------------------------------
1716 - def _generic_rm(self, fqid, forced_action):
1717 # remove entry first 1718 RemoteARDAJobRepository._generic_rm(self, fqid, forced_action) 1719 # remove all associated blobs 1720 try: 1721 self.rm(self._getBlobFileName(fqid, '*')) 1722 except CommandException, e: 1723 msg = "Can not delete blobs related to the job %s because of the error: %s" % (str(fqid), str(e)) 1724 logger.debug(msg)
1725 1726 1727 ###########################################################################################
1728 -class LocalARDAJobRepository(ARDARepositoryMixIn, MDStandalone):
1729 1730 #--------------------------------------------------------------------------------------
1731 - def __init__(self, 1732 schema, 1733 role, 1734 streamer, 1735 tree_streamer, 1736 root_dir, 1737 local_root = '/tmp/', 1738 blocklength = 1000, 1739 cache_size = 100000, 1740 tries_limit = 200, 1741 lock_timeout = 1, 1742 init_schema = True, 1743 **kwds):
1744 1745 # last selected row in the table 1746 self.__row = 0 1747 1748 # create root dir, if missing 1749 if not os.path.isdir(local_root): 1750 try: 1751 os.makedirs(local_root) 1752 except Exception, e: 1753 logger.error(str(e)) 1754 raise e 1755 else: 1756 logger.debug("Root directory %s has been successfully created" % local_root) 1757 1758 1759 # init MDStandalone 1760 MDStandalone.__init__(self, local_root, 1761 blocklength = blocklength, 1762 cache_size = cache_size, 1763 tries_limit = tries_limit) 1764 1765 # check that the lock timeout is long enough otherwise there is a risk of removing a valid lock 1766 # time interval from diskutils is 0.05 1767 max_lock_time = 0.05*tries_limit 1768 if lock_timeout < max_lock_time: 1769 logger.warning("The 'lock_timeout' parameter is too small with respect to 'tries_limit' = %d" % tries_limit) 1770 logger.warning("In order to avoid risk of deleting valid lock file the 'lock_timeout' parameter will be adjusted automatically") 1771 lock_timeout = max_lock_time 1772 self._lock_timeout = lock_timeout 1773 1774 # remove old table locks 1775 try: 1776 old_locks = MDStandalone.listAllLocks(self, self._lock_timeout) 1777 if old_locks: 1778 logger.warning("Lock files that are older than %d seconds are found in the repository" % self._lock_timeout) 1779 logger.warning("These locks most likely appear as a result of previous errors and will be removed") 1780 logger.warning("You can suppress this operation by increasing 'lock_timeout' parameter of the local repository constructor") 1781 logger.warning("Deleting old lock files: %s", old_locks) 1782 MDStandalone.removeAllLocks(self, self._lock_timeout) 1783 except Exception, e: 1784 logger.debug(str(e)) 1785 Ganga.Utility.logging.log_user_exception(logger) 1786 raise RepositoryError(e = e, msg = str(e)) 1787 1788 # init ARDARepositoryMixIn 1789 ARDARepositoryMixIn.__init__(self, schema, role, 1790 streamer, 1791 tree_streamer, 1792 root_dir, 1793 init_schema)
1794 1795 #--------------------------------------------------------------------------------------
1796 - def _createDirIfMissing(self, path):
1797 self._rep_lock.acquire(1) 1798 try: 1799 # creates dirs in path if do not exist 1800 try: 1801 self.createDir(path) 1802 except CommandException, e: 1803 if e.errorCode != 16: 1804 logger.debug(str(e)) 1805 raise RepositoryError(e = e, msg = str(e)) 1806 finally: 1807 self._rep_lock.release()
1808 1809 #--------------------------------------------------------------------------------------
1810 - def _createAttrIfMissing(self, path, schema):
1811 self._rep_lock.acquire(1) 1812 try: 1813 r_table = self._MDStandalone__absolutePath(self.root_dir) 1814 p_table = self._MDStandalone__absolutePath(path) 1815 if p_table != r_table: 1816 try: 1817 # try to copy attributes file 1818 if r_table in self.loaded_tables: 1819 mdtable = self.self.loaded_tables[r_table] 1820 attr_name = mdtable.attributes.blocks[0].name 1821 attr_dir = mdtable.attributes.storage.dirname 1822 path_dir = self._MDStandalone__systemPath(p_table) 1823 shutil.copy(os.path.join(attr_dir, attr_name), path_dir) 1824 except Exception, e: 1825 logger.debug(str(e)) 1826 # create attributes in a standard way 1827 ARDARepositoryMixIn._createAttrIfMissing(self, path, schema) 1828 finally: 1829 self._rep_lock.release()
1830 1831 #--------------------------------------------------------------------------------------
1832 - def _get_iteration_list(self, mdtable):
1833 # returns list of reodered entry indexes 1834 ii = range(len(mdtable.entries)) 1835 return ii[self.__row:] + ii[:self.__row]
1836 1837 #--------------------------------------------------------------------------------------
1838 - def _generic_selectAttr(self, selection, path, attr_list):
1839 self._MDStandalone__initTransaction() 1840 try: 1841 self.rows = [] 1842 mdtable = self._MDStandalone__loadTable(path)[0] 1843 1844 indx = [] 1845 for k in attr_list: 1846 if k in mdtable.attributeDict: 1847 ind = mdtable.attributeDict[k]+1 1848 indx.append(ind) 1849 1850 f_indx = [] 1851 f_res = True 1852 for k in selection: 1853 if k in mdtable.attributeDict: 1854 ind = mdtable.attributeDict[k]+1 1855 f_indx.append((ind,k)) 1856 else: 1857 f_res = False 1858 1859 def filterer(e): 1860 res = f_res 1861 if res: 1862 for ind, k in f_indx: 1863 res = res and (e[ind] == selection[k]) 1864 if not res: 1865 break 1866 return res
1867 1868 for e in mdtable.entries: 1869 if filterer(e): 1870 md = [e[0]] # we always return filename 1871 for ind in indx: 1872 md.append(e[ind]) 1873 self.rows.append(md) 1874 finally: 1875 self.releaseAllLocks()
1876 1877 #--------------------------------------------------------------------------------------
1878 - def _generic_getattr(self, fqid, attrs):
1879 self._MDStandalone__initTransaction() 1880 try: 1881 file = self._getJobFileName(fqid) 1882 path, entry = os.path.split(file) 1883 mdtable = self._MDStandalone__loadTable(path)[0] 1884 self.rows = [] 1885 1886 indx = [] 1887 for k in attrs: 1888 ind = mdtable.attributeDict[k]+1 1889 indx.append(ind) 1890 1891 for i in self._get_iteration_list(mdtable): 1892 e = mdtable.entries[i] 1893 if e[0] == entry: 1894 row = [] 1895 row.append(e[0]) 1896 for ind in indx: 1897 row.append(e[ind]) 1898 self.rows.append(row) 1899 self.__row = i 1900 break 1901 finally: 1902 self.releaseAllLocks()
1903 1904 #--------------------------------------------------------------------------------------
1905 - def _generic_updateAttr(self, fqid, attrs, values, forced_action):
1906 self._MDStandalone__initTransaction() 1907 try: 1908 path, entry = os.path.split(self._getJobFileName(fqid)) 1909 mdtable = self._MDStandalone__loadTable(path)[0] 1910 for n in self._get_iteration_list(mdtable): 1911 e = mdtable.entries[n] 1912 if e[0] == entry: 1913 lock_ind = mdtable.attributeDict[self._lock[0]]+1 1914 guid = self._job_cache[fqid] 1915 if not (forced_action or e[lock_ind] == guid): 1916 msg = "Job %s can not be commited because it is probably controlled by other client" % str(fqid) 1917 raise RepositoryError(msg = msg) 1918 for i in range(0, len(attrs)): 1919 e[mdtable.attributeDict[attrs[i]]+1] = values[i] 1920 mdtable.entries[n] = e 1921 self.__row = n 1922 break 1923 else: 1924 msg = "Job %s is not registered" % str(fqid) 1925 raise RepositoryError(msg = msg) 1926 finally: 1927 self.releaseAllLocks()
1928 1929 #--------------------------------------------------------------------------------------
1930 - def _generic_rm(self, fqid, forced_action):
1931 self._MDStandalone__initTransaction() 1932 try: 1933 path, entry = os.path.split(self._getJobFileName(fqid)) 1934 mdtable = self._MDStandalone__loadTable(path)[0] 1935 for n in self._get_iteration_list(mdtable): 1936 e = mdtable.entries[n] 1937 if e[0] == entry: 1938 lock_ind = mdtable.attributeDict[self._lock[0]]+1 1939 guid = self._job_cache[fqid] 1940 if forced_action or e[lock_ind] == guid: 1941 del mdtable.entries[n] 1942 self.__row = n 1943 break 1944 else: 1945 msg = "Job %s is not registered" % str(fqid) 1946 raise RepositoryError(msg = msg) 1947 finally: 1948 self.releaseAllLocks()
1949 1950 #--------------------------------------------------------------------------------------
1951 - def _generic_sequenceNext(self, name, reserve = 0):
1952 return self.sequenceNext(name, reserve)
1953 1954 #--------------------------------------------------------------------------------------
1955 - def _initBulkGetAttr(self):
1956 self._initCommand()
1957 1958 #--------------------------------------------------------------------------------------
1959 - def _finalizeBulkGetAttr(self):
1960 try: 1961 try: 1962 self.abort() 1963 except CommandException, e: 1964 if e.errorCode != 9: 1965 raise e 1966 except Exception, e: 1967 msg = str(e) 1968 logger.error(msg) 1969 raise RepositoryError(e = e, msg = msg)
1970 1971 #--------------------------------------------------------------------------------------
1972 - def _initBulkRm(self):
1973 self._initCommand()
1974 1975 #--------------------------------------------------------------------------------------
1976 - def _finalizeBulkRm(self):
1977 self._finalizeCommand()
1978 1979 #--------------------------------------------------------------------------------------
1980 - def removeAllLocks(self):
1981 self._rep_lock.acquire(1) 1982 try: 1983 try: 1984 MDStandalone.removeAllLocks(self, self._lock_timeout) 1985 except Exception, e: 1986 logger.debug(str(e)) 1987 Ganga.Utility.logging.log_user_exception(logger) 1988 raise RepositoryError(e = e, msg = str(e)) 1989 finally: 1990 self._rep_lock.release()
1991 1992 #--------------------------------------------------------------------------------------
1993 - def listAllLocks(self):
1994 self._rep_lock.acquire(1) 1995 try: 1996 try: 1997 return MDStandalone.listAllLocks(self, self._lock_timeout) 1998 except Exception, e: 1999 logger.debug(str(e)) 2000 Ganga.Utility.logging.log_user_exception(logger) 2001 raise RepositoryError(e = e, msg = str(e)) 2002 finally: 2003 self._rep_lock.release() 2004 2005 2006 ################################################################################ 2007 # factory function
2008 -def repositoryFactory(**kwargs):
2009 # main config 2010 config = Ganga.Utility.Config.getConfig('Configuration') 2011 2012 ## def rep type 2013 repositoryType = "LocalAMGA" 2014 #repositoryType = config['repositorytype'] 2015 assert repositoryType in ['LocalAMGA', 'RemoteAMGA', 'Test'] 2016 2017 # synchronize kwargs with the repository configuration 2018 rep_config = all_configs[repositoryType] # repository config 2019 kwargs = kwargs.copy() 2020 for key in rep_config.options.keys(): 2021 if key not in kwargs: 2022 kwargs[key] = rep_config[key] 2023 2024 kw_schema = kwargs.get('schema') 2025 if kw_schema is None: 2026 kw_schema = schema[:] 2027 2028 role = kwargs.get('role') 2029 if not role: 2030 role = 'Client' 2031 2032 streamer = kwargs.get('streamer') 2033 if streamer is None: 2034 from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleJobStreamer 2035 streamer = SimpleJobStreamer() 2036 2037 tree_streamer = kwargs.get('tree_streamer') 2038 if tree_streamer is None: 2039 from Ganga.GPIDev.Streamers.SimpleStreamer import SimpleTreeStreamer 2040 tree_streamer = SimpleTreeStreamer() 2041 2042 root_dir = kwargs.get('root_dir') 2043 if not root_dir: 2044 if repositoryType == 'RemoteAMGA': 2045 root_dir = '/'.join(['', 'users', config['user'], __version__]) 2046 else: 2047 root_dir = '/'.join(['', __version__]) 2048 # we have to add subpath like 'jobs', 'templats' to the root dir 2049 subpath = kwargs.get('subpath') 2050 if subpath: 2051 root_dir = '/'.join([root_dir, subpath]) 2052 2053 # remove positional arguments from keywords: 2054 for aname in ['schema', 'role', 'streamer', 'tree_streamer', 'root_dir']: 2055 if aname in kwargs: 2056 del kwargs[aname] 2057 2058 info = 'AMGA Job Repository: type=%s ganga_user=%s' % (config['repositorytype'], config['user']) 2059 try: 2060 if repositoryType == 'LocalAMGA': 2061 if not kwargs.get('local_root'): 2062 # local_root is dynamically derived it is not configurable parameter anymore 2063 kwargs['local_root'] = os.path.join(expandfilename(config['gangadir']), 'repository', config['user'], 'LocalAMGA') 2064 info += ' db_location=%s' % kwargs['local_root'] 2065 logger.debug("Creating local repository ...") 2066 return LocalARDAJobRepository(kw_schema, role, 2067 streamer, 2068 tree_streamer, 2069 root_dir, 2070 **kwargs) 2071 2072 if repositoryType == 'RemoteAMGA': 2073 if not kwargs.get('login'): 2074 kwargs['login'] = config['user'] 2075 info += ' login=%s host=%s port=%s reqSSL=%s' % (kwargs['login'], kwargs['host'], kwargs['port'], kwargs['reqSSL']) 2076 if USE_ORACLE_AS_REMOTE_BACKEND: 2077 RemoteRepositoryClass = RemoteOracleARDAJobRepository 2078 else: 2079 RemoteRepositoryClass = RemoteARDAJobRepository 2080 logger.debug("Creating remote repository ...") 2081 return RemoteRepositoryClass(kw_schema, role, 2082 streamer, 2083 tree_streamer, 2084 root_dir, 2085 **kwargs) 2086 2087 logger.debug("Creating test repository...") 2088 from TestRepository import TestRepository 2089 if not kwargs.get('local_root'): 2090 # local_root is dynamically derived it is not configurable parameter anymore 2091 kwargs['local_root'] = os.path.join(expandfilename(config['gangadir']), 'repository', config['user'], 'Test') 2092 info += ' root_dir='+root_dir 2093 return TestRepository(kw_schema, role, streamer, root_dir, **kwargs) 2094 2095 finally: 2096 info += ' root_dir=' + root_dir 2097 logger.info(info)
2098