Package Ganga :: Package Lib :: Package LCG :: Module LCGSandboxCache
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.LCG.LCGSandboxCache

  1  ############################################################################### 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: LCGSandboxCache.py,v 1.8 2009-03-12 12:17:31 hclee Exp $ 
  5  ############################################################################### 
  6  # 
  7  # LCG backend 
  8  # 
  9  # ATLAS/ARDA 
 10  # 
 11  # Date:   January 2007 
 12  import os 
 13  import os.path 
 14  import re 
 15  from types import * 
 16  from urlparse import urlparse 
 17   
 18  from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm   
 19   
 20  from Ganga.GPIDev.Schema import * 
 21  from Ganga.GPIDev.Lib.File import * 
 22   
 23  from Ganga.Utility.logging import getLogger 
 24  from Ganga.Utility.GridShell import getShell  
 25   
 26  from Ganga.Lib.LCG.GridSandboxCache import GridSandboxCache, GridFileIndex 
 27  from Ganga.Lib.LCG.Utility import * 
 28   
 29  lcg_sandbox_cache_schema_datadict = GridSandboxCache._schema.inherit_copy().datadict 
 30  lcg_file_index_schema_datadict    = GridFileIndex._schema.inherit_copy().datadict 
 31   
32 -class LCGFileIndex(GridFileIndex):
33 """ 34 Data object containing LCG file index information. 35 36 @author: Hurng-Chun Lee 37 @contact: hurngchunlee@gmail.com 38 """ 39 40 lcg_file_index_schema_datadict.update({ 41 'lfc_host' : SimpleItem(defvalue='', copyable=1, doc='the LFC hostname'), 42 'local_fpath' : SimpleItem(defvalue='', copyable=1, doc='the original file path on local machine') 43 } ) 44 45 _schema = Schema( Version(1,0), lcg_file_index_schema_datadict ) 46 _category = 'GridFileIndex' 47 _name = 'LCGFileIndex' 48
49 - def __init__(self):
50 super(LCGFileIndex,self).__init__()
51
52 -class LCGSandboxCache(GridSandboxCache):
53 ''' 54 Helper class for upladong/downloading/deleting sandbox files using lcg-cr/lcg-cp/lcg-del commands. 55 56 @author: Hurng-Chun Lee 57 @contact: hurngchunlee@gmail.com 58 ''' 59 60 lcg_sandbox_cache_schema_datadict.update({ 61 'se' : SimpleItem(defvalue='', copyable=1, doc='the LCG SE hostname'), 62 'se_type' : SimpleItem(defvalue='srmv2', copyable=1, doc='the LCG SE type'), 63 'se_rpath' : SimpleItem(defvalue='generated', copyable=1, doc='the relative path to the VO directory on the SE'), 64 'lfc_host' : SimpleItem(defvalue='', copyable=1, doc='the LCG LFC hostname'), 65 'srm_token' : SimpleItem(defvalue='', copyable=1, doc='the SRM space token, meaningful only when se_type is set to srmv2') 66 } ) 67 68 _schema = Schema( Version(1,0), lcg_sandbox_cache_schema_datadict ) 69 _category = 'GridSandboxCache' 70 _name = 'LCGSandboxCache' 71 72 logger = getLogger() 73
74 - def __init__(self):
75 super(LCGSandboxCache,self).__init__() 76 self.protocol = 'lcg'
77
78 - def __setattr__(self, attr, value):
79 if attr == 'se_type' and value not in ['','srmv1','srmv2','se']: 80 raise AttributeError('invalid se_type: %s' % value) 81 super(LCGSandboxCache,self).__setattr__(attr, value)
82
83 - def impl_upload(self, files=[], opts=''):
84 """ 85 Uploads multiple files to a remote grid storage. 86 """ 87 88 shell = getShell(self.middleware) 89 90 if self.lfc_host: 91 shell.env['LFC_HOST'] = self.lfc_host 92 93 self.logger.debug('upload file with LFC_HOST: %s', shell.env['LFC_HOST']) 94 95 ## the algorithm of uploading one file 96 class MyAlgorithm(Algorithm): 97 98 def __init__(self, cacheObj): 99 Algorithm.__init__(self) 100 self.cacheObj = cacheObj 101 self.dirname = self.cacheObj.__get_unique_fname__()
102 103 def process(self, file): 104 ## decide number of parallel stream to be used 105 fsize = os.path.getsize( urlparse(file)[2] ) 106 fname = os.path.basename( urlparse(file)[2] ) 107 fpath = os.path.abspath( urlparse(file)[2] ) 108 109 md5sum = get_md5sum(fpath, ignoreGzipTimestamp=True) 110 nbstream = int((fsize*1.0)/(10.0*1024*1024*1024)) 111 112 if nbstream < 1: nbstream = 1 # min stream 113 if nbstream > 8: nbstream = 8 # max stream 114 115 cmd = 'lcg-cr -t 180 --vo %s -n %d' % (self.cacheObj.vo, nbstream) 116 if self.cacheObj.se != None: 117 cmd = cmd + ' -d %s' % self.cacheObj.se 118 if self.cacheObj.se_type == 'srmv2' and self.cacheObj.srm_token: 119 cmd = cmd + ' -D srmv2 -s %s' % self.cacheObj.srm_token 120 121 ## specify the physical location 122 cmd = cmd + ' -P %s/ganga.%s/%s' % ( self.cacheObj.se_rpath, self.dirname, fname ) 123 124 ## specify the logical filename 125 ## NOTE: here we assume the root dir for VO is /grid/<voname> 126 lfc_dir = '/grid/%s/ganga.%s' % (self.cacheObj.vo, self.dirname) 127 if not self.cacheObj.__lfc_mkdir__(shell, lfc_dir): 128 self.cacheObj.logger.warning('cannot create LFC directory: %s' % lfc_dir) 129 return None 130 131 cmd = cmd + ' -l %s/%s %s' % (lfc_dir, fname, file) 132 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try) 133 134 if rc != 0: 135 return False 136 else: 137 match = re.search('(guid:\S+)',output) 138 if match: 139 guid = match.group(1) 140 141 fidx = LCGFileIndex() 142 fidx.id = guid 143 fidx.name = fname 144 fidx.md5sum = md5sum 145 fidx.lfc_host = self.cacheObj.lfc_host 146 fidx.local_fpath = fpath 147 148 self.__appendResult__( file, fidx ) 149 return True 150 else: 151 return False
152 153 myAlg = MyAlgorithm(cacheObj=self) 154 myData = Data(collection=files) 155 156 runner = MTRunner(name='sandboxcache_lcgcr', algorithm=myAlg, data=myData) 157 runner.start() 158 runner.join(-1) 159 160 return runner.getResults().values() 161
162 - def impl_download(self, files=[], dest_dir=None, opts=''):
163 """ 164 Downloads multiple files from remote grid storages to 165 a local directory. 166 """ 167 if not dest_dir: 168 dest_dir = os.getcwd() 169 self.logger.debug('download file to: %s', dest_dir) 170 171 # the algorithm of downloading one file to a local directory 172 class MyAlgorithm(Algorithm): 173 174 def __init__(self, cacheObj): 175 Algorithm.__init__(self) 176 self.cacheObj = cacheObj 177 self.shell = getShell(self.cacheObj.middleware)
178 179 def process(self, file): 180 181 guid = file.id 182 lfn = file.attributes['local_fpath'] 183 lfc_host = file.attributes['lfc_host'] 184 fname = os.path.basename( urlparse(lfn)[2] ) 185 186 self.shell.env['LFC_HOST'] = lfc_host 187 self.cacheObj.logger.debug('download file with LFC_HOST: %s', self.shell.env['LFC_HOST']) 188 189 cmd = 'lcg-cp -t %d --vo %s ' % (self.cacheObj.timeout, self.cacheObj.vo) 190 if self.cacheObj.se_type: 191 cmd += '-T %s ' % self.cacheObj.se_type 192 cmd += '%s file://%s/%s' % (guid, dest_dir, fname) 193 194 self.cacheObj.logger.debug('download file: %s', cmd) 195 196 rc,output,m = self.cacheObj.__cmd_retry_loop__(self.shell, cmd, self.cacheObj.max_try) 197 198 if rc != 0: 199 return False 200 else: 201 self.__appendResult__(file.id, file) 202 return True 203 204 myAlg = MyAlgorithm(cacheObj=self) 205 myData = Data(collection=files) 206 207 runner = MTRunner(name='sandboxcache_lcgcp', algorithm=myAlg, data=myData) 208 runner.start() 209 runner.join(-1) 210 211 return runner.getResults().values() 212
213 - def impl_delete(self, files=[], opts=''):
214 """ 215 Deletes multiple files from remote grid storages. 216 """ 217 218 # the algorithm of downloading one file to a local directory 219 class MyAlgorithm(Algorithm): 220 221 def __init__(self, cacheObj): 222 Algorithm.__init__(self) 223 self.cacheObj = cacheObj 224 self.shell = getShell(self.cacheObj.middleware)
225 226 def process(self, file): 227 228 guid = file.id 229 230 lfc_host = file.attributes['lfc_host'] 231 232 self.shell.env['LFC_HOST'] = lfc_host 233 234 self.cacheObj.logger.debug('delete file with LFC_HOST: %s' % self.shell.env['LFC_HOST']) 235 236 cmd = 'lcg-del -a -t 60 --vo %s %s' % (self.cacheObj.vo, guid) 237 238 rc,output,m = self.cacheObj.__cmd_retry_loop__(self.shell, cmd, self.cacheObj.max_try) 239 240 if rc != 0: 241 return False 242 else: 243 self.__appendResult__(file.id, file) 244 return True 245 246 myAlg = MyAlgorithm(cacheObj=self) 247 myData = Data(collection=files) 248 249 runner = MTRunner(name='sandboxcache_lcgdel', algorithm=myAlg, data=myData) 250 runner.start() 251 runner.join(-1) 252 253 ## update the local index file 254 del_files = runner.getResults().values() 255 all_files = self.get_cached_files() 256 257 left_files = [] 258 for f in all_files: 259 if f not in del_files: 260 left_files.append(f) 261 262 self.impl_bookkeepUploadedFiles(left_files, append=False) 263 264 return del_files 265 266 # For GUID protocol
267 - def __lfc_mkdir__(self, shell, path, mode='775'):
268 '''Creates a directory in LFC''' 269 270 cmd = 'lfc-mkdir -p -m %s %s' % (mode, path) 271 272 (rc, output, m) = self.__cmd_retry_loop__(shell, cmd, 1) 273 274 if rc != 0: 275 return False 276 else: 277 return True
278