1 import os
2 import os.path
3 from urlparse import urlparse
4
5 from Ganga.GPIDev.Schema import *
6 from Ganga.GPIDev.Lib.File import *
7
8 from Ganga.Utility.GridShell import getShell
9
10 from Ganga.Lib.LCG.GridSandboxCache import GridSandboxCache, GridFileIndex
11 from Ganga.Lib.LCG.Utility import *
12 from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm
13
14 gridftp_sandbox_cache_schema_datadict = GridSandboxCache._schema.inherit_copy().datadict
15 gridftp_file_index_schema_datadict = GridFileIndex._schema.inherit_copy().datadict
16
18 """
19 Data object containing Gridftp file index information.
20
21 - id: gsiftp URI
22 - name: basename of the file
23 - md5sum: md5 checksum
24 - attributes['fpath']: path of the file on local machine
25
26 @author: Hurng-Chun Lee
27 @contact: hurngchunlee@gmail.com
28 """
29
30 _schema = Schema( Version(1,0), gridftp_file_index_schema_datadict )
31 _category = 'GridFileIndex'
32 _name = 'GridftpFileIndex'
33
36
38 '''
39 Helper class for upladong/downloading/deleting sandbox files using lcg-cp/lcg-del commands with gsiftp protocol.
40
41 @author: Hurng-Chun Lee
42 @contact: hurngchunlee@gmail.com
43 '''
44
45 gridftp_sandbox_cache_schema_datadict.update({
46 'baseURI' : SimpleItem(defvalue='', copyable=1, doc='the base URI for storing cached files')
47 })
48
49 _schema = Schema( Version(1,0), gridftp_sandbox_cache_schema_datadict )
50 _category = 'GridSandboxCache'
51 _name = 'GridftpSandboxCache'
52
53 logger = getLogger()
54
58
60 """
61 Uploads multiple files to a remote gridftp server.
62 """
63
64 shell = getShell(self.middleware)
65
66
67 dirname = self.__get_unique_fname__()
68
69
70
71 dir_ok = False
72
73 destURI = '%s/%s' % (self.baseURI, dirname)
74
75 uri_info = urisplit(destURI)
76
77 cmd = 'uberftp %s "cd %s"' % (uri_info[1], uri_info[2])
78
79 rc,output,m = self.__cmd_retry_loop__(shell, cmd, 1)
80
81 if rc != 0:
82
83 for l in output.split('\n'):
84 l.strip()
85 if re.match(r'^550.*', l):
86
87 cmd = 'uberftp %s "mkdir %s"' % (uri_info[1], uri_info[2])
88
89 rc,output,m = self.__cmd_retry_loop__(shell, cmd, 1)
90
91 if rc != 0:
92 self.logger.error(output)
93 else:
94 dir_ok = True
95
96 break
97 else:
98 self.logger.debug('parent directory already available: %s' % destURI)
99 dir_ok = True
100
101 if not dir_ok:
102 self.logger.error('parent directory not available: %s' % destURI)
103 return []
104
105
106 class MyAlgorithm(Algorithm):
107
108 def __init__(self, cacheObj):
109 Algorithm.__init__(self)
110 self.cacheObj = cacheObj
111
112 def process(self, file):
113
114 fsize = os.path.getsize( urlparse(file)[2] )
115 fname = os.path.basename( urlparse(file)[2] )
116 fpath = os.path.abspath( urlparse(file)[2] )
117
118 md5sum = get_md5sum(fpath, ignoreGzipTimestamp=True)
119 nbstream = int((fsize*1.0)/(10.0*1024*1024*1024))
120
121 if nbstream < 1: nbstream = 1
122 if nbstream > 8: nbstream = 8
123
124 myDestURI = '%s/%s' % (destURI, fname)
125
126
127 cmd = 'uberftp'
128 if nbstream > 1:
129 cmd += ' -c %d' % nbstream
130
131 cmd += ' file:%s %s' % (fpath, myDestURI)
132
133 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try)
134
135 if rc != 0:
136 self.cacheObj.logger.error(output)
137 return False
138 else:
139 fidx = GridftpFileIndex()
140 fidx.id = myDestURI
141 fidx.name = fname
142 fidx.md5sum = md5sum
143 fidx.attributes['fpath'] = fpath
144
145 self.__appendResult__( file, fidx )
146 return True
147
148 myAlg = MyAlgorithm(cacheObj=self)
149 myData = Data(collection=files)
150
151 runner = MTRunner(name='sandboxcache_gridftp', algorithm=myAlg, data=myData)
152 runner.start()
153 runner.join(-1)
154
155 return runner.getResults().values()
156
158 """
159 Downloads multiple files from gridftp server to
160 a local directory.
161 """
162 if not dest_dir:
163 dest_dir = os.getcwd()
164 self.logger.debug('download file to: %s', dest_dir)
165
166 shell = getShell(self.middleware)
167
168
169 class MyAlgorithm(Algorithm):
170
171 def __init__(self, cacheObj):
172 Algorithm.__init__(self)
173 self.cacheObj = cacheObj
174
175 def process(self, file):
176
177 srcURI = file.id
178 fname = os.path.basename( urisplit(srcURI)[2] )
179 destURI = 'file:%s/%s' % (dest_dir, fname)
180
181
182 cmd = 'globus-url-copy %s %s' % (srcURI, destURI)
183
184 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try)
185
186 if rc != 0:
187 self.cacheObj.logger.error(output)
188 return False
189 else:
190 self.__appendResult__(file.id, file)
191 return True
192
193 myAlg = MyAlgorithm(cacheObj=self)
194 myData = Data(collection=files)
195
196 runner = MTRunner(name='sandboxcache_gridftp', algorithm=myAlg, data=myData)
197 runner.start()
198 runner.join(-1)
199
200 return runner.getResults().values()
201
203 """
204 Deletes multiple files from remote gridftp server
205 """
206
207 shell = getShell(self.middleware)
208
209
210 class MyAlgorithm(Algorithm):
211
212 def __init__(self, cacheObj):
213 Algorithm.__init__(self)
214 self.cacheObj = cacheObj
215
216 def process(self, file):
217
218 destURI = file.id
219
220 uri_info = urisplit(destURI)
221
222 cmd = 'uberftp %s "rm %s"' % (uri_info[1], uri_info[2])
223
224 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try)
225
226 if rc != 0:
227 self.cacheObj.logger.error(output)
228 return False
229 else:
230 self.__appendResult__(file.id, file)
231 return True
232
233 myAlg = MyAlgorithm(cacheObj=self)
234 myData = Data(collection=files)
235
236 runner = MTRunner(name='sandboxcache_lcgdel', algorithm=myAlg, data=myData)
237 runner.start()
238 runner.join(-1)
239
240
241 del_files = runner.getResults().values()
242 all_files = self.get_cached_files()
243
244 left_files = []
245 for f in all_files:
246 if f not in del_files:
247 left_files.append(f)
248
249 self.impl_bookkeepUploadedFiles(left_files, append=False)
250
251 return del_files
252