1
2
3
4
5
6
7 import os
8 import shutil
9 import atexit
10 from ARDA import repositoryFactory, __version__
11
12 import Ganga.Utility.logging
13 from Ganga.Utility.logging import _set_log_level
14 logger = Ganga.Utility.logging.getLogger(modulename=1)
15
16
17 old_schema = [('id', 'int'),
18 ('name', 'varchar(254)'),
19 ('status', 'varchar(254)'),
20 ('application', 'varchar(254)'),
21 ('backend', 'varchar(254)'),
22 ('jobBlob', 'text'),
23 ('lock', 'varchar(254)')]
24
25 _migrate_at_once = 1
26 _all_new_reps = {}
27
28
30
31 def standard_return():
32 return repositoryFactory(repositoryType = repositoryType, root_dir = root_dir, **kwargs)
33
34 def checkoutJobs(self, ids_or_attributes):
35
36 self._rep_lock.acquire(1)
37 try:
38 jobs = []
39 attr_list = ['id', 'jobBlob', 'lock']
40 def visitor(ida):
41 res = []
42 metadata = self._getMetaData(ida, attr_list)
43 for md in metadata:
44 try:
45 jdict = eval(self.text2pstr(md[1]))
46 rr = []
47 except Exception, e:
48 msg = 'Dictionary of job %s cannot be evaluated because of the error: %s. The job is most likely corrupted and will not be not imported. To permanently delete the job use the command jobs._impl.repository.deleteJobs([%s]).' % (md[0], str(e),md[0])
49 logger.error(msg)
50 else:
51 res.append([jdict, rr])
52 return res
53
54 jdicts = map(lambda x: x[0], visitor(ids_or_attributes))
55 for jdict in jdicts:
56 try:
57 job = self._streamer._getJobFromDict(jdict)
58 except Exception, e:
59 msg = 'Exception: %s while constructing job object from dictionary %s' % (str(e), repr(jdict))
60 logger.error(msg)
61 else:
62 jobs.append(job)
63 return jobs
64 finally:
65 self._rep_lock.release()
66
67 def registerJobs(self, jobs):
68
69 self._rep_lock.acquire(1)
70 try:
71 self._commitJobs(jobs, forced_action = True, deep = True, register = True, get_ids = False)
72 finally:
73 self._rep_lock.release()
74
75 if repositoryType == 'Local':
76 dn,fn = os.path.split(root_dir)
77 dirs = []
78 while fn:
79 if fn == __version__:
80 break
81 dirs.insert(0,fn)
82 dn,fn = os.path.split(dn)
83 else:
84 logger.warning('Non conventional root directory of the new repository. The old repository will not be migrated')
85 return standard_return()
86 old_root_dir = os.path.join(dn, os.path.join(*dirs))
87 local_root = kwargs.get('root', '/tmp/')
88 old_path = os.path.join(local_root, old_root_dir)
89 new_path = os.path.join(local_root, root_dir)
90 rep_type = dirs[-1]
91 if not os.path.isdir(old_path):
92 logger.debug('No old repository found')
93 _all_new_reps[new_path] = False
94 return standard_return()
95 else:
96 _all_new_reps[new_path] = True
97 if os.path.isdir(new_path):
98 logger.debug('New repository has already been created')
99 return standard_return()
100 else:
101 msg ='One of previous versions of repository is found; %s will be migrated to the new repository' % rep_type
102 logger.warning(msg)
103 new_repository = repositoryFactory(repositoryType = repositoryType,
104 root_dir = root_dir, **kwargs)
105 kwargs['schema'] = old_schema[:]
106 kwargs['init_schema'] = False
107 old_repository = repositoryFactory(repositoryType = repositoryType, root_dir = old_root_dir, **kwargs)
108
109 ids = old_repository.getJobIds({})
110 nn = 0
111 while ids:
112 ids_m = ids[:_migrate_at_once]
113 ids = ids[_migrate_at_once:]
114 try:
115 jobs = checkoutJobs(old_repository, ids_m)
116 except Exception, e:
117 msg = ("Error while getting %s from the old repository: " % rep_type) + str(e)
118 logger.error(msg)
119 else:
120 try:
121
122 jobs = filter(lambda j: j.status != "incomplete", jobs)
123 registerJobs(new_repository, jobs)
124 except Exception, e:
125 msg = ("Error while saving old %s in the new repository: " % rep_type) + str(e)
126 logger.error(msg)
127 else:
128 nn+=len(jobs)
129
130 import Ganga.GPIDev.Lib.JobTree.JobTree
131 try:
132 job_tree = old_repository.getJobTree()
133 if job_tree:
134 new_repository.setJobTree(job_tree)
135 except Exception, e:
136 msg = "Error while saving old jobtree in the new repository: " + str(e)
137 logger.error(msg)
138 if nn > 0:
139 msg = '%d %s have been migrated to the new repository. To revert the migration use "revert_migration_v42tov43()" command' % (nn, rep_type)
140 logger.warning(msg)
141 return new_repository
142 else:
143 logger.debug('Nothing to migrate. No support for the old remote repository')
144 return standard_return()
145
146
147 __revert_migration = False
148
154
155 atexit.register(_revert_migration_v42tov43)
156
163