1
2
3
4
5
6
7
8
9
10
11
12 import re
13
14 from types import *
15
16 from Ganga.GPIDev.Base import GangaObject
17 from Ganga.GPIDev.Schema import *
18 from Ganga.GPIDev.Lib.File import *
19 from Ganga.GPIDev.Credentials import getCredential
20
21 from Ganga.Utility.logging import getLogger
22 from Ganga.Lib.LCG.Utility import *
23
24 from Ganga.Utility.ColourText import ANSIMarkup, NoMarkup, Foreground, Effects
25
27
28 '''
29 Data object for indexing a file on the grid.
30
31 @author: Hurng-Chun Lee
32 @contact: hurngchunlee@gmail.com
33 '''
34
35 _schema = Schema(Version(1,0), {
36 'id' : SimpleItem(defvalue='', doc='the main identity of the file'),
37 'name' : SimpleItem(defvalue='', doc='the name of the file'),
38 'md5sum' : SimpleItem(defvalue='', doc='the md5sum of the file'),
39 'attributes' : SimpleItem(defvalue={}, doc='a key:value pairs of file metadata')
40 })
41
42 _category = 'GridFileIndex'
43 _name = 'GridFileIndex'
44
45 logger = getLogger()
46
49
52
54
55 '''
56 Helper class for upladong/downloading/deleting sandbox files on a grid cache.
57
58 @author: Hurng-Chun Lee
59 @contact: hurngchunlee@gmail.com
60 '''
61
62 _schema = Schema(Version(1,1), {
63 'vo' : SimpleItem(defvalue='dteam', hidden=1, copyable=0, doc='the Grid virtual organization'),
64 'middleware' : SimpleItem(defvalue='EDG', hidden=1, copyable=1, doc='the LCG middleware type'),
65 'protocol' : SimpleItem(defvalue='', copyable=1, doc='file transfer protocol'),
66 'max_try' : SimpleItem(defvalue=1, doc='max. number of tries in case of failures'),
67 'timeout' : SimpleItem(defvalue=180, copyable=0, hidden=1, doc='transfer timeout in seconds'),
68 'uploaded_files' : ComponentItem('GridFileIndex', defvalue=[], sequence=1, protected=1, copyable=0, hidden=1, doc='a repository record for the uploaded files')
69 })
70
71 _category = 'GridSandboxCache'
72 _name = 'GridSandboxCache'
73 _exportmethods = ['upload', 'download', 'delete', 'get_cached_files', 'list_cached_files', 'cleanup']
74
75 logger = getLogger()
76
79
80 - def upload(self, files=[], opts=''):
81 """
82 Uploads multiple files to a remote grid storage.
83
84 @param files is a list of local files to be uploaded to the grid.
85 The elemement can be a file path or a File object.
86
87 @return True if files are successfully uploaded; otherwise it returns False
88 """
89 status = False
90
91 paths = []
92 for f in files:
93 if f.__class__.__name__ == 'File':
94 paths.append('file://%s' % f.name)
95 elif f.__class__.__name__ == 'str':
96 paths.append('file://%s' % f)
97 else:
98 logger.warning('unknown file expression: %s' % repr(f))
99
100 uploaded_files = self.impl_upload(files=paths, opts=opts)
101
102 if len( uploaded_files ) == len( files ):
103 status = self.impl_bookkeepUploadedFiles(uploaded_files, append=True, opts=opts)
104 else:
105 status = False
106
107 if len(uploaded_files) == len(files):
108 status = self.impl_bookkeepUploadedFiles(uploaded_files, append=True, opts=opts)
109 else:
110 status = False
111
112 return status
113
114 - def download(self, files=[], dest_dir=None, opts=''):
115 """
116 Downloads multiple files from remote grid storages to
117 a local directory.
118
119 If the file is successfully downloaded, the local file path would be:
120
121 - os.path.join(dest_dir, os.path.basename(local_fname_n)
122
123 @param files is a list of files to be downloaded from the grid.
124 The data format of it should be:
125 - [index_grid_file_1, index_grid_file_2, ...]
126
127 @param dest_dir is a local destination directory to store the downloaded files.
128
129 @return True if files are successfully downloaded; otherwise it returns False
130 """
131 status = False
132 myFiles = self.__get_file_index_objects__(files)
133 downloadedFiles = self.impl_download(files=myFiles, dest_dir=dest_dir, opts=opts)
134
135 if len(downloadedFiles) == len(myFiles):
136 status = True
137 else:
138 logger.warning('some files not successfully downloaded')
139
140 return status
141
142 - def delete(self, files=[], opts=''):
143 """
144 Deletes multiple files from remote grid storages.
145
146 @param files is a list of files to be deleted from the grid.
147 The data format of it should be:
148 - [index_grid_file_1, index_grid_file_2, ...]
149
150 @return True if files are successfully deleted; otherwise it returns False
151 """
152 status = False
153 myFiles = self.__get_file_index_objects__(files)
154 deletedFiles = self.impl_delete(files=myFiles, opts=opts)
155
156 if len(deletedFiles) == len(myFiles):
157 status = True
158 else:
159 logger.warning('some files not successfully deleted')
160
161 return status
162
164 """
165 Cleans up the uploaded files.
166
167 @return True if all grid files listed in the index file are successfully deleted.
168 """
169 status = False
170
171 all_files = self.get_cached_files()
172
173 f_ids = []
174 for f in all_files:
175 f_ids.append(f.id)
176
177 return self.delete(files=f_ids)
178
180 """
181 Gets the indexes of the uploaded files on the grid.
182
183 @return the dictionary indexing the uploaded files on the grid.
184 The key of the dictionary should be the main index (e.g. GUID) of the grid files.
185 """
186 return self.impl_getUploadedFiles(opts=opts)
187
189 """
190 Lists the uploaded files.
191
192 if loop = True, it prints also the uploaded files associated with subjobs.
193 """
194
195 fc = 0
196 ds = ''
197
198 doColoring = True
199
200 fg = Foreground()
201 fx = Effects()
202
203 status_colors = {'inuse':fg.orange,
204 'free' :fg.blue,
205 'gone' :fg.red}
206
207 status_mapping = {'new' : 'inuse',
208 'submitted' : 'inuse',
209 'submitting':'inuse',
210 'running' : 'inuse',
211 'completed' : 'free',
212 'completing': 'free',
213 'failed' : 'free',
214 'killed' : 'free'}
215
216 if doColoring:
217 markup = ANSIMarkup()
218 else:
219 markup = NoMarkup()
220
221 def __markup_by_status__(fileIndex, counter, status):
222
223 fmtStr = '\n%4d\t%-30s\t%-12s\t%s' % (counter, fileIndex.name, status, fileIndex.id)
224
225 try:
226 return markup(fmtStr, status_colors[status])
227 except KeyError:
228 return markup(fmtStr, fx.normal)
229
230 j = self.getJobObject()
231
232 for f in self.get_cached_files(opts=opts):
233
234 my_status = 'unknown'
235
236 if j:
237 try:
238 my_status = status_mapping[j.status]
239 except KeyError:
240 pass
241
242 ds += __markup_by_status__(f, fc, my_status)
243
244 fc += 1
245
246 if j and loop:
247 for sj in j.subjobs:
248 for f in sj.backend.sandboxcache.get_cached_files(opts=opts):
249
250 my_status = 'unknown'
251
252 try:
253 my_status = status_mapping[sj.status]
254 except KeyError:
255 pass
256
257 ds += __markup_by_status__(f, fc, my_status)
258
259 fc += 1
260
261 return ds
262
263
265 """
266 Uploads multiple files to a remote grid storage.
267
268 @param files is a list of files in URL format (i.e. file://...)
269
270 @return a list of successfully uploaded files represented by GridFileIndex objects
271 """
272 raise NotImplementedError
273
275 """
276 Downloads multiple files from remote grid storages to
277 a local directory.
278
279 @param files is a list of files represented by GridFileIndex objects
280 @param dest_dir is a local destination directory to store the downloaded files.
281
282 @return a list of successfully downloaded files represented by GridFileIndex objects
283 """
284 raise NotImplementedError
285
287 """
288 Deletes multiple files from remote grid storages.
289
290 @param files is a list of files represented by GridFileIndex objects
291 @return a list of successfully deleted files represented by GridFileIndex objects
292 """
293 raise NotImplementedError
294
296 """
297 basic implementation for bookkeeping the uploaded files.
298 It simply keeps the GridFileIndex objects in the job repository.
299
300 @param files is a list of files represented by GridFileIndex objects
301 @return True if files are successfully logged in the local index file
302 """
303
304 self.uploaded_files = files
305
306 return True
307
309 """
310 basic implementation for getting the previously uploaded files from the
311 job repository.
312
313 @return a list of files represented by GridFileIndex objects
314 """
315 files = self.uploaded_files
316
317 return files
318
319
321 '''Gets file index object according to the given file list
322 - try to get the GridFileIndex object from the local index file.
323
324 @param files is a list of file indexes
325 @return a list of files represented by GridFileIndex objects
326 '''
327
328 cachedFiles = self.get_cached_files()
329 myFiles = []
330 for f in cachedFiles:
331 if f.id in files:
332 myFiles.append(f)
333
334 return myFiles
335
337 '''gets an unique filename'''
338 cred = getCredential('GridProxy',self.middleware)
339 uid = re.sub(r'[\:\-\(\)]{1,}','',cred.identity()).lower()
340 fname = 'user.%s.%s' % (uid, get_uuid())
341 return fname
342
344 '''Executing system command with retry feature'''
345 i = 0
346 rc = 0
347 output = None
348 m = None
349 try_again = True
350 while try_again:
351 i = i + 1
352 self.logger.debug('run cmd: %s' % cmd)
353 rc, output, m = shell.cmd1(cmd,allowed_exit=[0,255])
354 if rc in [0,255]:
355 try_again = False
356 elif i == maxRetry:
357 try_again = False
358 else:
359 self.logger.warning("trial %d: error: %s" % (i,output))
360
361 return (rc, output, m)
362