Package Ganga :: Package Lib :: Package LCG :: Module GridSandboxCache
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.LCG.GridSandboxCache

  1  ############################################################################### 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: GridSandboxCache.py,v 1.10 2009-07-16 10:41:17 hclee Exp $ 
  5  ############################################################################### 
  6  # 
  7  # LCG backend 
  8  # 
  9  # ATLAS/ARDA 
 10  # 
 11  # Date:   January 2007 
 12  import re 
 13   
 14  from types import * 
 15   
 16  from Ganga.GPIDev.Base import GangaObject 
 17  from Ganga.GPIDev.Schema import * 
 18  from Ganga.GPIDev.Lib.File import * 
 19  from Ganga.GPIDev.Credentials import getCredential  
 20   
 21  from Ganga.Utility.logging import getLogger 
 22  from Ganga.Lib.LCG.Utility import * 
 23   
 24  from Ganga.Utility.ColourText import ANSIMarkup, NoMarkup, Foreground, Effects 
 25   
26 -class GridFileIndex(GangaObject):
27 28 ''' 29 Data object for indexing a file on the grid. 30 31 @author: Hurng-Chun Lee 32 @contact: hurngchunlee@gmail.com 33 ''' 34 35 _schema = Schema(Version(1,0), { 36 'id' : SimpleItem(defvalue='', doc='the main identity of the file'), 37 'name' : SimpleItem(defvalue='', doc='the name of the file'), 38 'md5sum' : SimpleItem(defvalue='', doc='the md5sum of the file'), 39 'attributes' : SimpleItem(defvalue={}, doc='a key:value pairs of file metadata') 40 }) 41 42 _category = 'GridFileIndex' 43 _name = 'GridFileIndex' 44 45 logger = getLogger() 46
47 - def __init__(self):
48 super(GridFileIndex,self).__init__()
49
50 - def __eq__(self, other):
51 return other.id == self.id
52
53 -class GridSandboxCache(GangaObject):
54 55 ''' 56 Helper class for upladong/downloading/deleting sandbox files on a grid cache. 57 58 @author: Hurng-Chun Lee 59 @contact: hurngchunlee@gmail.com 60 ''' 61 62 _schema = Schema(Version(1,1), { 63 'vo' : SimpleItem(defvalue='dteam', hidden=1, copyable=0, doc='the Grid virtual organization'), 64 'middleware' : SimpleItem(defvalue='EDG', hidden=1, copyable=1, doc='the LCG middleware type'), 65 'protocol' : SimpleItem(defvalue='', copyable=1, doc='file transfer protocol'), 66 'max_try' : SimpleItem(defvalue=1, doc='max. number of tries in case of failures'), 67 'timeout' : SimpleItem(defvalue=180, copyable=0, hidden=1, doc='transfer timeout in seconds'), 68 'uploaded_files' : ComponentItem('GridFileIndex', defvalue=[], sequence=1, protected=1, copyable=0, hidden=1, doc='a repository record for the uploaded files') 69 }) 70 71 _category = 'GridSandboxCache' 72 _name = 'GridSandboxCache' 73 _exportmethods = ['upload', 'download', 'delete', 'get_cached_files', 'list_cached_files', 'cleanup'] 74 75 logger = getLogger() 76
77 - def __init__(self):
78 super(GridSandboxCache,self).__init__()
79
80 - def upload(self, files=[], opts=''):
81 """ 82 Uploads multiple files to a remote grid storage. 83 84 @param files is a list of local files to be uploaded to the grid. 85 The elemement can be a file path or a File object. 86 87 @return True if files are successfully uploaded; otherwise it returns False 88 """ 89 status = False 90 91 paths = [] 92 for f in files: 93 if f.__class__.__name__ == 'File': 94 paths.append('file://%s' % f.name) 95 elif f.__class__.__name__ == 'str': 96 paths.append('file://%s' % f) 97 else: 98 logger.warning('unknown file expression: %s' % repr(f)) 99 100 uploaded_files = self.impl_upload(files=paths, opts=opts) 101 102 if len( uploaded_files ) == len( files ): 103 status = self.impl_bookkeepUploadedFiles(uploaded_files, append=True, opts=opts) 104 else: 105 status = False 106 107 if len(uploaded_files) == len(files): 108 status = self.impl_bookkeepUploadedFiles(uploaded_files, append=True, opts=opts) 109 else: 110 status = False 111 112 return status
113
114 - def download(self, files=[], dest_dir=None, opts=''):
115 """ 116 Downloads multiple files from remote grid storages to 117 a local directory. 118 119 If the file is successfully downloaded, the local file path would be: 120 121 - os.path.join(dest_dir, os.path.basename(local_fname_n) 122 123 @param files is a list of files to be downloaded from the grid. 124 The data format of it should be: 125 - [index_grid_file_1, index_grid_file_2, ...] 126 127 @param dest_dir is a local destination directory to store the downloaded files. 128 129 @return True if files are successfully downloaded; otherwise it returns False 130 """ 131 status = False 132 myFiles = self.__get_file_index_objects__(files) 133 downloadedFiles = self.impl_download(files=myFiles, dest_dir=dest_dir, opts=opts) 134 135 if len(downloadedFiles) == len(myFiles): 136 status = True 137 else: 138 logger.warning('some files not successfully downloaded') 139 140 return status
141
142 - def delete(self, files=[], opts=''):
143 """ 144 Deletes multiple files from remote grid storages. 145 146 @param files is a list of files to be deleted from the grid. 147 The data format of it should be: 148 - [index_grid_file_1, index_grid_file_2, ...] 149 150 @return True if files are successfully deleted; otherwise it returns False 151 """ 152 status = False 153 myFiles = self.__get_file_index_objects__(files) 154 deletedFiles = self.impl_delete(files=myFiles, opts=opts) 155 156 if len(deletedFiles) == len(myFiles): 157 status = True 158 else: 159 logger.warning('some files not successfully deleted') 160 161 return status
162
163 - def cleanup(self, opts=''):
164 """ 165 Cleans up the uploaded files. 166 167 @return True if all grid files listed in the index file are successfully deleted. 168 """ 169 status = False 170 171 all_files = self.get_cached_files() 172 173 f_ids = [] 174 for f in all_files: 175 f_ids.append(f.id) 176 177 return self.delete(files=f_ids)
178
179 - def get_cached_files(self, opts=''):
180 """ 181 Gets the indexes of the uploaded files on the grid. 182 183 @return the dictionary indexing the uploaded files on the grid. 184 The key of the dictionary should be the main index (e.g. GUID) of the grid files. 185 """ 186 return self.impl_getUploadedFiles(opts=opts)
187
188 - def list_cached_files(self, loop=True, opts=''):
189 """ 190 Lists the uploaded files. 191 192 if loop = True, it prints also the uploaded files associated with subjobs. 193 """ 194 195 fc = 0 196 ds = '' 197 198 doColoring = True 199 200 fg = Foreground() 201 fx = Effects() 202 203 status_colors = {'inuse':fg.orange, 204 'free' :fg.blue, 205 'gone' :fg.red} 206 207 status_mapping = {'new' : 'inuse', 208 'submitted' : 'inuse', 209 'submitting':'inuse', 210 'running' : 'inuse', 211 'completed' : 'free', 212 'completing': 'free', 213 'failed' : 'free', 214 'killed' : 'free'} 215 216 if doColoring: 217 markup = ANSIMarkup() 218 else: 219 markup = NoMarkup() 220 221 def __markup_by_status__(fileIndex, counter, status): 222 223 fmtStr = '\n%4d\t%-30s\t%-12s\t%s' % (counter, fileIndex.name, status, fileIndex.id) 224 225 try: 226 return markup(fmtStr, status_colors[status]) 227 except KeyError: 228 return markup(fmtStr, fx.normal)
229 230 j = self.getJobObject() 231 232 for f in self.get_cached_files(opts=opts): 233 234 my_status = 'unknown' 235 236 if j: 237 try: 238 my_status = status_mapping[j.status] 239 except KeyError: 240 pass 241 242 ds += __markup_by_status__(f, fc, my_status) 243 244 fc += 1 245 246 if j and loop: 247 for sj in j.subjobs: 248 for f in sj.backend.sandboxcache.get_cached_files(opts=opts): 249 250 my_status = 'unknown' 251 252 try: 253 my_status = status_mapping[sj.status] 254 except KeyError: 255 pass 256 257 ds += __markup_by_status__(f, fc, my_status) 258 259 fc += 1 260 261 return ds
262 263 ## methods to be implemented in the child classes
264 - def impl_upload(self, files=[], opts=''):
265 """ 266 Uploads multiple files to a remote grid storage. 267 268 @param files is a list of files in URL format (i.e. file://...) 269 270 @return a list of successfully uploaded files represented by GridFileIndex objects 271 """ 272 raise NotImplementedError
273
274 - def impl_download(self, files=[], dest_dir=None, opts=''):
275 """ 276 Downloads multiple files from remote grid storages to 277 a local directory. 278 279 @param files is a list of files represented by GridFileIndex objects 280 @param dest_dir is a local destination directory to store the downloaded files. 281 282 @return a list of successfully downloaded files represented by GridFileIndex objects 283 """ 284 raise NotImplementedError
285
286 - def impl_delete(self, files=[], opts=''):
287 """ 288 Deletes multiple files from remote grid storages. 289 290 @param files is a list of files represented by GridFileIndex objects 291 @return a list of successfully deleted files represented by GridFileIndex objects 292 """ 293 raise NotImplementedError
294
295 - def impl_bookkeepUploadedFiles(self, files=[], append=True, opts=''):
296 """ 297 basic implementation for bookkeeping the uploaded files. 298 It simply keeps the GridFileIndex objects in the job repository. 299 300 @param files is a list of files represented by GridFileIndex objects 301 @return True if files are successfully logged in the local index file 302 """ 303 304 self.uploaded_files = files 305 306 return True
307
308 - def impl_getUploadedFiles(self, opts=''):
309 """ 310 basic implementation for getting the previously uploaded files from the 311 job repository. 312 313 @return a list of files represented by GridFileIndex objects 314 """ 315 files = self.uploaded_files 316 317 return files
318 319 ## private methods
320 - def __get_file_index_objects__(self, files=[]):
321 '''Gets file index object according to the given file list 322 - try to get the GridFileIndex object from the local index file. 323 324 @param files is a list of file indexes 325 @return a list of files represented by GridFileIndex objects 326 ''' 327 328 cachedFiles = self.get_cached_files() 329 myFiles = [] 330 for f in cachedFiles: 331 if f.id in files: 332 myFiles.append(f) 333 334 return myFiles
335
336 - def __get_unique_fname__(self):
337 '''gets an unique filename''' 338 cred = getCredential('GridProxy',self.middleware) 339 uid = re.sub(r'[\:\-\(\)]{1,}','',cred.identity()).lower() 340 fname = 'user.%s.%s' % (uid, get_uuid()) 341 return fname
342
343 - def __cmd_retry_loop__(self,shell,cmd,maxRetry=3):
344 '''Executing system command with retry feature''' 345 i = 0 346 rc = 0 347 output = None 348 m = None 349 try_again = True 350 while try_again: 351 i = i + 1 352 self.logger.debug('run cmd: %s' % cmd) 353 rc, output, m = shell.cmd1(cmd,allowed_exit=[0,255]) 354 if rc in [0,255]: 355 try_again = False 356 elif i == maxRetry: 357 try_again = False 358 else: 359 self.logger.warning("trial %d: error: %s" % (i,output)) 360 361 return (rc, output, m)
362