Package Ganga :: Package Core :: Package JobRepository :: Module RepositoryMigration42to43
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.JobRepository.RepositoryMigration42to43

  1  ################################################################################ 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: RepositoryMigration42to43.py,v 1.1 2008-07-17 16:40:50 moscicki Exp $ 
  5  ################################################################################ 
  6   
  7  import os 
  8  import shutil 
  9  import atexit 
 10  from ARDA import repositoryFactory, __version__ 
 11   
 12  import Ganga.Utility.logging 
 13  from Ganga.Utility.logging import _set_log_level 
 14  logger = Ganga.Utility.logging.getLogger(modulename=1) 
 15  #_set_log_level(logger,'INFO') 
 16   
 17  old_schema = [('id',             'int'), 
 18                ('name',           'varchar(254)'), 
 19                ('status',         'varchar(254)'), 
 20                ('application',    'varchar(254)'), 
 21                ('backend',        'varchar(254)'), 
 22                ('jobBlob',        'text'), 
 23                ('lock',           'varchar(254)')] 
 24   
 25  _migrate_at_once = 1  #number of jobs to migrate in one go 
 26  _all_new_reps = {} 
 27   
 28   
29 -def migrationRepositoryFactory(repositoryType = 'Local', root_dir = '/users', **kwargs):
30 31 def standard_return(): 32 return repositoryFactory(repositoryType = repositoryType, root_dir = root_dir, **kwargs)
33 34 def checkoutJobs(self, ids_or_attributes): 35 # self == old_repository 36 self._rep_lock.acquire(1) 37 try: 38 jobs = [] 39 attr_list = ['id', 'jobBlob', 'lock'] 40 def visitor(ida): 41 res = [] 42 metadata = self._getMetaData(ida, attr_list) 43 for md in metadata: 44 try: 45 jdict = eval(self.text2pstr(md[1])) 46 rr = [] 47 except Exception, e: 48 msg = 'Dictionary of job %s cannot be evaluated because of the error: %s. The job is most likely corrupted and will not be not imported. To permanently delete the job use the command jobs._impl.repository.deleteJobs([%s]).' % (md[0], str(e),md[0]) 49 logger.error(msg) 50 else: 51 res.append([jdict, rr]) 52 return res 53 54 jdicts = map(lambda x: x[0], visitor(ids_or_attributes)) 55 for jdict in jdicts: 56 try: 57 job = self._streamer._getJobFromDict(jdict) 58 except Exception, e: 59 msg = 'Exception: %s while constructing job object from dictionary %s' % (str(e), repr(jdict)) 60 logger.error(msg) 61 else: 62 jobs.append(job) 63 return jobs 64 finally: 65 self._rep_lock.release() 66 67 def registerJobs(self, jobs): 68 # self == new_repository 69 self._rep_lock.acquire(1) 70 try: 71 self._commitJobs(jobs, forced_action = True, deep = True, register = True, get_ids = False) 72 finally: 73 self._rep_lock.release() 74 75 if repositoryType == 'Local': 76 dn,fn = os.path.split(root_dir) 77 dirs = [] 78 while fn: 79 if fn == __version__: 80 break 81 dirs.insert(0,fn) 82 dn,fn = os.path.split(dn) 83 else: 84 logger.warning('Non conventional root directory of the new repository. The old repository will not be migrated') 85 return standard_return() 86 old_root_dir = os.path.join(dn, os.path.join(*dirs)) 87 local_root = kwargs.get('root', '/tmp/') 88 old_path = os.path.join(local_root, old_root_dir) 89 new_path = os.path.join(local_root, root_dir) 90 rep_type = dirs[-1] #jobs or templates 91 if not os.path.isdir(old_path): 92 logger.debug('No old repository found') 93 _all_new_reps[new_path] = False 94 return standard_return() 95 else: 96 _all_new_reps[new_path] = True 97 if os.path.isdir(new_path): 98 logger.debug('New repository has already been created') 99 return standard_return() 100 else: 101 msg ='One of previous versions of repository is found; %s will be migrated to the new repository' % rep_type 102 logger.warning(msg) 103 new_repository = repositoryFactory(repositoryType = repositoryType, 104 root_dir = root_dir, **kwargs) 105 kwargs['schema'] = old_schema[:] 106 kwargs['init_schema'] = False 107 old_repository = repositoryFactory(repositoryType = repositoryType, root_dir = old_root_dir, **kwargs) 108 109 ids = old_repository.getJobIds({}) 110 nn = 0 111 while ids: 112 ids_m = ids[:_migrate_at_once] 113 ids = ids[_migrate_at_once:] 114 try: 115 jobs = checkoutJobs(old_repository, ids_m) 116 except Exception, e: 117 msg = ("Error while getting %s from the old repository: " % rep_type) + str(e) 118 logger.error(msg) 119 else: 120 try: 121 # don't register jobs in incomplete state 122 jobs = filter(lambda j: j.status != "incomplete", jobs) 123 registerJobs(new_repository, jobs) 124 except Exception, e: 125 msg = ("Error while saving old %s in the new repository: " % rep_type) + str(e) 126 logger.error(msg) 127 else: 128 nn+=len(jobs) 129 130 import Ganga.GPIDev.Lib.JobTree.JobTree ## has to be in GPI before conversion 131 try: 132 job_tree = old_repository.getJobTree() 133 if job_tree: 134 new_repository.setJobTree(job_tree) 135 except Exception, e: 136 msg = "Error while saving old jobtree in the new repository: " + str(e) 137 logger.error(msg) 138 if nn > 0: 139 msg = '%d %s have been migrated to the new repository. To revert the migration use "revert_migration_v42tov43()" command' % (nn, rep_type) 140 logger.warning(msg) 141 return new_repository 142 else: 143 logger.debug('Nothing to migrate. No support for the old remote repository') 144 return standard_return() 145 146 147 __revert_migration = False #to support the trick with handlers order (see below atexit) 148
149 -def _revert_migration_v42tov43():
150 if __revert_migration: 151 for path in _all_new_reps: 152 if _all_new_reps[path]: 153 shutil.rmtree(path)
154 155 atexit.register(_revert_migration_v42tov43) #this is a trick: always register to revert handler to put it before any handlers in the repository 156
157 -def revert_migration_v42tov43():
158 'undo repository migration (4.2->4.3)' 159 global __revert_migration 160 __revert_migration = True 161 msg = 'The job repository will be reverted to the old one at the end of the ganga session' 162 logger.warning(msg)
163