1
2
3
4
5
6
7
8
9
10
11
12 import os
13 import os.path
14 import re
15 from types import *
16 from urlparse import urlparse
17
18 from Ganga.Core.GangaThread.MTRunner import MTRunner, Data, Algorithm
19
20 from Ganga.GPIDev.Schema import *
21 from Ganga.GPIDev.Lib.File import *
22
23 from Ganga.Utility.logging import getLogger
24 from Ganga.Utility.GridShell import getShell
25
26 from Ganga.Lib.LCG.GridSandboxCache import GridSandboxCache, GridFileIndex
27 from Ganga.Lib.LCG.Utility import *
28
29 lcg_sandbox_cache_schema_datadict = GridSandboxCache._schema.inherit_copy().datadict
30 lcg_file_index_schema_datadict = GridFileIndex._schema.inherit_copy().datadict
31
51
53 '''
54 Helper class for upladong/downloading/deleting sandbox files using lcg-cr/lcg-cp/lcg-del commands.
55
56 @author: Hurng-Chun Lee
57 @contact: hurngchunlee@gmail.com
58 '''
59
60 lcg_sandbox_cache_schema_datadict.update({
61 'se' : SimpleItem(defvalue='', copyable=1, doc='the LCG SE hostname'),
62 'se_type' : SimpleItem(defvalue='srmv2', copyable=1, doc='the LCG SE type'),
63 'se_rpath' : SimpleItem(defvalue='generated', copyable=1, doc='the relative path to the VO directory on the SE'),
64 'lfc_host' : SimpleItem(defvalue='', copyable=1, doc='the LCG LFC hostname'),
65 'srm_token' : SimpleItem(defvalue='', copyable=1, doc='the SRM space token, meaningful only when se_type is set to srmv2')
66 } )
67
68 _schema = Schema( Version(1,0), lcg_sandbox_cache_schema_datadict )
69 _category = 'GridSandboxCache'
70 _name = 'LCGSandboxCache'
71
72 logger = getLogger()
73
77
79 if attr == 'se_type' and value not in ['','srmv1','srmv2','se']:
80 raise AttributeError('invalid se_type: %s' % value)
81 super(LCGSandboxCache,self).__setattr__(attr, value)
82
84 """
85 Uploads multiple files to a remote grid storage.
86 """
87
88 shell = getShell(self.middleware)
89
90 if self.lfc_host:
91 shell.env['LFC_HOST'] = self.lfc_host
92
93 self.logger.debug('upload file with LFC_HOST: %s', shell.env['LFC_HOST'])
94
95
96 class MyAlgorithm(Algorithm):
97
98 def __init__(self, cacheObj):
99 Algorithm.__init__(self)
100 self.cacheObj = cacheObj
101 self.dirname = self.cacheObj.__get_unique_fname__()
102
103 def process(self, file):
104
105 fsize = os.path.getsize( urlparse(file)[2] )
106 fname = os.path.basename( urlparse(file)[2] )
107 fpath = os.path.abspath( urlparse(file)[2] )
108
109 md5sum = get_md5sum(fpath, ignoreGzipTimestamp=True)
110 nbstream = int((fsize*1.0)/(10.0*1024*1024*1024))
111
112 if nbstream < 1: nbstream = 1
113 if nbstream > 8: nbstream = 8
114
115 cmd = 'lcg-cr -t 180 --vo %s -n %d' % (self.cacheObj.vo, nbstream)
116 if self.cacheObj.se != None:
117 cmd = cmd + ' -d %s' % self.cacheObj.se
118 if self.cacheObj.se_type == 'srmv2' and self.cacheObj.srm_token:
119 cmd = cmd + ' -D srmv2 -s %s' % self.cacheObj.srm_token
120
121
122 cmd = cmd + ' -P %s/ganga.%s/%s' % ( self.cacheObj.se_rpath, self.dirname, fname )
123
124
125
126 lfc_dir = '/grid/%s/ganga.%s' % (self.cacheObj.vo, self.dirname)
127 if not self.cacheObj.__lfc_mkdir__(shell, lfc_dir):
128 self.cacheObj.logger.warning('cannot create LFC directory: %s' % lfc_dir)
129 return None
130
131 cmd = cmd + ' -l %s/%s %s' % (lfc_dir, fname, file)
132 rc,output,m = self.cacheObj.__cmd_retry_loop__(shell, cmd, self.cacheObj.max_try)
133
134 if rc != 0:
135 return False
136 else:
137 match = re.search('(guid:\S+)',output)
138 if match:
139 guid = match.group(1)
140
141 fidx = LCGFileIndex()
142 fidx.id = guid
143 fidx.name = fname
144 fidx.md5sum = md5sum
145 fidx.lfc_host = self.cacheObj.lfc_host
146 fidx.local_fpath = fpath
147
148 self.__appendResult__( file, fidx )
149 return True
150 else:
151 return False
152
153 myAlg = MyAlgorithm(cacheObj=self)
154 myData = Data(collection=files)
155
156 runner = MTRunner(name='sandboxcache_lcgcr', algorithm=myAlg, data=myData)
157 runner.start()
158 runner.join(-1)
159
160 return runner.getResults().values()
161
163 """
164 Downloads multiple files from remote grid storages to
165 a local directory.
166 """
167 if not dest_dir:
168 dest_dir = os.getcwd()
169 self.logger.debug('download file to: %s', dest_dir)
170
171
172 class MyAlgorithm(Algorithm):
173
174 def __init__(self, cacheObj):
175 Algorithm.__init__(self)
176 self.cacheObj = cacheObj
177 self.shell = getShell(self.cacheObj.middleware)
178
179 def process(self, file):
180
181 guid = file.id
182 lfn = file.attributes['local_fpath']
183 lfc_host = file.attributes['lfc_host']
184 fname = os.path.basename( urlparse(lfn)[2] )
185
186 self.shell.env['LFC_HOST'] = lfc_host
187 self.cacheObj.logger.debug('download file with LFC_HOST: %s', self.shell.env['LFC_HOST'])
188
189 cmd = 'lcg-cp -t %d --vo %s ' % (self.cacheObj.timeout, self.cacheObj.vo)
190 if self.cacheObj.se_type:
191 cmd += '-T %s ' % self.cacheObj.se_type
192 cmd += '%s file://%s/%s' % (guid, dest_dir, fname)
193
194 self.cacheObj.logger.debug('download file: %s', cmd)
195
196 rc,output,m = self.cacheObj.__cmd_retry_loop__(self.shell, cmd, self.cacheObj.max_try)
197
198 if rc != 0:
199 return False
200 else:
201 self.__appendResult__(file.id, file)
202 return True
203
204 myAlg = MyAlgorithm(cacheObj=self)
205 myData = Data(collection=files)
206
207 runner = MTRunner(name='sandboxcache_lcgcp', algorithm=myAlg, data=myData)
208 runner.start()
209 runner.join(-1)
210
211 return runner.getResults().values()
212
214 """
215 Deletes multiple files from remote grid storages.
216 """
217
218
219 class MyAlgorithm(Algorithm):
220
221 def __init__(self, cacheObj):
222 Algorithm.__init__(self)
223 self.cacheObj = cacheObj
224 self.shell = getShell(self.cacheObj.middleware)
225
226 def process(self, file):
227
228 guid = file.id
229
230 lfc_host = file.attributes['lfc_host']
231
232 self.shell.env['LFC_HOST'] = lfc_host
233
234 self.cacheObj.logger.debug('delete file with LFC_HOST: %s' % self.shell.env['LFC_HOST'])
235
236 cmd = 'lcg-del -a -t 60 --vo %s %s' % (self.cacheObj.vo, guid)
237
238 rc,output,m = self.cacheObj.__cmd_retry_loop__(self.shell, cmd, self.cacheObj.max_try)
239
240 if rc != 0:
241 return False
242 else:
243 self.__appendResult__(file.id, file)
244 return True
245
246 myAlg = MyAlgorithm(cacheObj=self)
247 myData = Data(collection=files)
248
249 runner = MTRunner(name='sandboxcache_lcgdel', algorithm=myAlg, data=myData)
250 runner.start()
251 runner.join(-1)
252
253
254 del_files = runner.getResults().values()
255 all_files = self.get_cached_files()
256
257 left_files = []
258 for f in all_files:
259 if f not in del_files:
260 left_files.append(f)
261
262 self.impl_bookkeepUploadedFiles(left_files, append=False)
263
264 return del_files
265
266
268 '''Creates a directory in LFC'''
269
270 cmd = 'lfc-mkdir -p -m %s %s' % (mode, path)
271
272 (rc, output, m) = self.__cmd_retry_loop__(shell, cmd, 1)
273
274 if rc != 0:
275 return False
276 else:
277 return True
278