Package Ganga :: Package Lib :: Package LCG :: Module GridftpSandboxCache
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.LCG.GridftpSandboxCache

  1  import os 
  2  import os.path 
  3  from urlparse import urlparse 
  4   
  5  from Ganga.GPIDev.Schema import * 
  6  from Ganga.GPIDev.Lib.File import * 
  7   
  8  from Ganga.Utility.GridShell import getShell 
  9   
 10  from Ganga.Lib.LCG.GridSandboxCache import GridSandboxCache, GridFileIndex 
 11  from Ganga.Lib.LCG.Utility import * 
 12  from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm 
 13   
 14  gridftp_sandbox_cache_schema_datadict = GridSandboxCache._schema.inherit_copy().datadict 
 15  gridftp_file_index_schema_datadict    = GridFileIndex._schema.inherit_copy().datadict 
 16   
17 -class GridftpFileIndex(GridFileIndex):
18 """ 19 Data object containing Gridftp file index information. 20 21 - id: gsiftp URI 22 - name: basename of the file 23 - md5sum: md5 checksum 24 - attributes['fpath']: path of the file on local machine 25 26 @author: Hurng-Chun Lee 27 @contact: hurngchunlee@gmail.com 28 """ 29 30 _schema = Schema( Version(1,0), gridftp_file_index_schema_datadict ) 31 _category = 'GridFileIndex' 32 _name = 'GridftpFileIndex' 33
34 - def __init__(self):
35 super(GridftpFileIndex,self).__init__()
36
37 -class GridftpSandboxCache(GridSandboxCache):
38 ''' 39 Helper class for upladong/downloading/deleting sandbox files using lcg-cp/lcg-del commands with gsiftp protocol. 40 41 @author: Hurng-Chun Lee 42 @contact: hurngchunlee@gmail.com 43 ''' 44 45 gridftp_sandbox_cache_schema_datadict.update({ 46 'baseURI' : SimpleItem(defvalue='', copyable=1, doc='the base URI for storing cached files') 47 }) 48 49 _schema = Schema( Version(1,0), gridftp_sandbox_cache_schema_datadict ) 50 _category = 'GridSandboxCache' 51 _name = 'GridftpSandboxCache' 52 53 logger = getLogger() 54
55 - def __init__(self):
56 super(GridftpSandboxCache,self).__init__() 57 self.protocol = 'gsiftp'
58
59 - def impl_upload(self, files=[], opts=''):
60 """ 61 Uploads multiple files to a remote gridftp server. 62 """ 63 64 shell = getShell(self.middleware) 65 66 ## making the directory on remove storage at destURI 67 dirname = self.__get_unique_fname__() 68 69 ## creating subdirectory 70 71 dir_ok = False 72 73 destURI = '%s/%s' % (self.baseURI, dirname) 74 75 uri_info = urisplit(destURI) 76 77 cmd = 'uberftp %s "cd %s"' % (uri_info[1], uri_info[2]) 78 79 rc,output,m = self.__cmd_retry_loop__(shell, cmd, 1) 80 81 if rc != 0: 82 83 for l in output.split('\n'): 84 l.strip() 85 if re.match(r'^550.*', l): 86 ## the directory is not found (error code 550), try to creat the lowest level one 87 cmd = 'uberftp %s "mkdir %s"' % (uri_info[1], uri_info[2]) 88 89 rc,output,m = self.__cmd_retry_loop__(shell, cmd, 1) 90 91 if rc != 0: 92 self.logger.error(output) 93 else: 94 dir_ok = True 95 96 break 97 else: 98 self.logger.debug('parent directory already available: %s' % destURI) 99 dir_ok = True 100 101 if not dir_ok: 102 self.logger.error('parent directory not available: %s' % destURI) 103 return [] 104 105 ## the algorithm of uploading one file 106 class MyAlgorithm(Algorithm): 107 108 def __init__(self, cacheObj): 109 Algorithm.__init__(self) 110 self.cacheObj = cacheObj
111 112 def process(self, file): 113 ## decide number of parallel stream to be used 114 fsize = os.path.getsize( urlparse(file)[2] ) 115 fname = os.path.basename( urlparse(file)[2] ) 116 fpath = os.path.abspath( urlparse(file)[2] ) 117 118 md5sum = get_md5sum(fpath, ignoreGzipTimestamp=True) 119 nbstream = int((fsize*1.0)/(10.0*1024*1024*1024)) 120 121 if nbstream < 1: nbstream = 1 # min stream 122 if nbstream > 8: nbstream = 8 # max stream 123 124 myDestURI = '%s/%s' % (destURI, fname) 125 126 ## uploading the file 127 cmd = 'uberftp' 128 if nbstream > 1: 129 cmd += ' -c %d' % nbstream 130 131 cmd += ' file:%s %s' % (fpath, myDestURI) 132 133 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try) 134 135 if rc != 0: 136 self.cacheObj.logger.error(output) 137 return False 138 else: 139 fidx = GridftpFileIndex() 140 fidx.id = myDestURI 141 fidx.name = fname 142 fidx.md5sum = md5sum 143 fidx.attributes['fpath'] = fpath 144 145 self.__appendResult__( file, fidx ) 146 return True
147 148 myAlg = MyAlgorithm(cacheObj=self) 149 myData = Data(collection=files) 150 151 runner = MTRunner(name='sandboxcache_gridftp', algorithm=myAlg, data=myData) 152 runner.start() 153 runner.join(-1) 154 155 return runner.getResults().values() 156
157 - def impl_download(self, files=[], dest_dir=None, opts=''):
158 """ 159 Downloads multiple files from gridftp server to 160 a local directory. 161 """ 162 if not dest_dir: 163 dest_dir = os.getcwd() 164 self.logger.debug('download file to: %s', dest_dir) 165 166 shell = getShell(self.middleware) 167 168 # the algorithm of downloading one file to a local directory 169 class MyAlgorithm(Algorithm): 170 171 def __init__(self, cacheObj): 172 Algorithm.__init__(self) 173 self.cacheObj = cacheObj
174 175 def process(self, file): 176 177 srcURI = file.id 178 fname = os.path.basename( urisplit(srcURI)[2] ) 179 destURI = 'file:%s/%s' % (dest_dir, fname) 180 181 #cmd = 'uberftp %s %s' % (srcURI, destURI) 182 cmd = 'globus-url-copy %s %s' % (srcURI, destURI) 183 184 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try) 185 186 if rc != 0: 187 self.cacheObj.logger.error(output) 188 return False 189 else: 190 self.__appendResult__(file.id, file) 191 return True 192 193 myAlg = MyAlgorithm(cacheObj=self) 194 myData = Data(collection=files) 195 196 runner = MTRunner(name='sandboxcache_gridftp', algorithm=myAlg, data=myData) 197 runner.start() 198 runner.join(-1) 199 200 return runner.getResults().values() 201
202 - def impl_delete(self, files=[], opts=''):
203 """ 204 Deletes multiple files from remote gridftp server 205 """ 206 207 shell = getShell(self.middleware) 208 209 # the algorithm of downloading one file to a local directory 210 class MyAlgorithm(Algorithm): 211 212 def __init__(self, cacheObj): 213 Algorithm.__init__(self) 214 self.cacheObj = cacheObj
215 216 def process(self, file): 217 218 destURI = file.id 219 220 uri_info = urisplit(destURI) 221 222 cmd = 'uberftp %s "rm %s"' % (uri_info[1], uri_info[2]) 223 224 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try) 225 226 if rc != 0: 227 self.cacheObj.logger.error(output) 228 return False 229 else: 230 self.__appendResult__(file.id, file) 231 return True 232 233 myAlg = MyAlgorithm(cacheObj=self) 234 myData = Data(collection=files) 235 236 runner = MTRunner(name='sandboxcache_lcgdel', algorithm=myAlg, data=myData) 237 runner.start() 238 runner.join(-1) 239 240 ## update the local index file 241 del_files = runner.getResults().values() 242 all_files = self.get_cached_files() 243 244 left_files = [] 245 for f in all_files: 246 if f not in del_files: 247 left_files.append(f) 248 249 self.impl_bookkeepUploadedFiles(left_files, append=False) 250 251 return del_files 252