1
2 from Ganga.Utility.Config import getConfig, ConfigError
3 from Ganga.Utility.logging import getLogger
4 from Ganga.Utility.util import isStringLike
5
6 from Ganga.Utility.GridShell import getShell
7 from Ganga.Lib.LCG.GridCache import GridCache
8
9 from Ganga.GPIDev.Credentials.ICredential import ICredential
10
11 logger = getLogger()
12
13 logger.critical('LCG Grid Simulator ENABLED')
14
15
16
17
18
19 config = makeConfig('GridSimulator','Grid Simulator configuration parameters')
20
21 config.addOption('submit_time','random.uniform(1,10)','python expression which returns the time it takes (in seconds) to complete the Grid.submit() command (also for subjob in bulk emulation)')
22 config.addOption('submit_failure_rate',0.0,'probability that the Grid.submit() method fails')
23
24 config.addOption('cancel_time','random.uniform(1,5)','python expression which returns the time it takes (in seconds) to complete the Grid.cancel() command (also for subjob in bulk emulation)')
25 config.addOption('cancel_failure_rate',0.0,'probability that the Grid.cancel() method fails')
26
27 config.addOption('status_time','random.uniform(1,5)','python expression which returns the time it takes (in seconds) to complete the status command (also for subjob in bulk emulation)')
28
29 config.addOption('get_output_time','random.uniform(1,5)','python expression which returns the time it takes (in seconds) to complete the get_output command (also for subjob in bulk emulation)')
30
31
32
33
34
35
36
37 config.addOption('job_id_resolved_time','random.uniform(1,2)','python expression which returns the time it takes (in seconds) to complete the resolution of all the id of a subjob (when submitted in bulk) this is the time the NODE_ID becomes available from the monitoring)')
38
39
40
41 config.addOption('job_finish_time', 'random.uniform(10,20)', 'python expression which returns the time when the job enters the Done success or Failed state' )
42 config.addOption('job_failure_rate', 0.0, 'probability of the job to enter the Failed state')
43
45 import time
46 time.sleep(get_number(val))
47
49 t = get_number(val)
50 import random
51 return random.random() < t
52
54 import random
55 if type(val) is type(''):
56 t = eval(val,{'random':random})
57 else:
58 t = val
59 if not type(t) in [type(1.0), type(1)]:
60
61 logger.error('problem with configuration option, invalid value: %s',val)
62 return 0
63
64 return t
65
66 import os
67 import time
68
69 cmd = 'simulation'
70
72 '''Simulator of LCG interactions'''
73
74 middleware = 'GLITE'
75
76 credential = None
77
79 self.active=True
80 self.middleware = middleware.upper()
81 self.credential = ICredential()
82
83
84
85 basedir = '.'
86 self.gridmap_filename = '%s/lcg_simulator_gridmap'%basedir
87 import shelve
88
89 self.jobid_map=shelve.open(self.gridmap_filename,writeback=False)
90 self.jobid_map.setdefault('_job_count',0)
91
92
93 self.finished_jobs_filename = '%s/lcg_simulator_finished_jobs'%basedir
94 self.ganga_finish_time = shelve.open(self.finished_jobs_filename,writeback=False)
95
96 self.shell = getShell('GLITE')
97
98 logger.critical('Grid Simulator data files: %s %s',self.gridmap_filename,self.finished_jobs_filename)
99
102
103 - def submit(self,jdlpath,ce=None):
104 '''This method is used for normal and native bulk submission supported by GLITE middleware.'''
105
106 logger.debug('job submit command: submit(jdlpath=%s,ce=%s)',jdlpath,ce)
107
108 jdl = eval(file(jdlpath).read())
109
110 subjob_ids = []
111 if jdl['Type'] == 'collection':
112 import re
113
114 r = re.compile(r'.*NodeName = "(gsj_\d+)"; file="(\S*)"*')
115 for line in jdl['Nodes'].splitlines()[1:-1]:
116 m = r.match(line)
117 if m:
118 nodename,sjdl_path = m.groups()
119 subjob_ids.append(self._submit(sjdl_path,ce,[],nodename=nodename))
120
121 masterid = self._submit(jdlpath,ce,subjob_ids)
122
123 return masterid
124
128
129 - def _submit(self,jdlpath,ce,subjob_ids,nodename=None):
130 '''Submit a JDL file to LCG'''
131
132 logger.debug('job submit command: _submit(jdlpath=%s,ce=%s,subjob_ids=%s)',jdlpath,ce,subjob_ids)
133
134 inputdir = os.path.dirname(os.path.realpath(jdlpath))
135
136 def write():
137 file(os.path.join(inputdir,'params'),'w').write(repr(runtime_params))
138
139 runtime_params = {}
140 runtime_params['submission_time_start'] = time.time()
141
142 sleep(config['submit_time'])
143 runtime_params['submission_time_stop'] = time.time()
144
145 if failed(config['submit_failure_rate']):
146 runtime_params['status'] = 'failed_to_submit'
147 write()
148 logger.warning('Job submission failed.')
149 return
150
151 jobid = self._make_new_id()
152
153 self.jobid_map[jobid] = inputdir
154
155 runtime_params['jobid'] = jobid
156 runtime_params['status'] = 'submitted'
157 runtime_params['should_fail'] = failed(config['job_failure_rate'])
158 runtime_params['expected_job_id_resolve_time'] = get_number(config['job_id_resolved_time'])
159 runtime_params['expected_finish_time'] = time.time()+get_number(config['job_finish_time'])
160 runtime_params['subjob_ids'] = subjob_ids
161 runtime_params['nodename'] = nodename
162 write()
163 return jobid
164
166 self.jobid_map['_job_count'] += 1
167 jobid = 'https://ganga.simulator.cern.ch/%d'%self.jobid_map['_job_count']
168 return jobid
169
179
181 '''Native bulk cancellation supported by GLITE middleware.'''
182
183 logger.debug('job cancel command: native_master_cancel(jobid=%s',jobid)
184
185
186 return self._cancel(jobid)
187
189 logger.debug('job status command: _status(jobid=%s,has_id=%d)',jobid,has_id)
190
191 info = { 'id' : None,
192 'name' : None,
193 'status' : None,
194 'exit' : None,
195 'reason' : None,
196 'is_node': False,
197 'destination' : 'anywhere' }
198
199 params = eval(file(self._params_filename(jobid)).read())
200
201 sleep(config['single_status_time'])
202
203 assert params['jobid'] == jobid
204
205 if has_id:
206 info['id'] = params['jobid']
207 info['name'] = params['nodename']
208
209
210
211
212 logger.debug('current_time-expected_finish_time = %d',time.time() - params['expected_finish_time'])
213
214 if time.time() > params['expected_finish_time']:
215 if params['should_fail']:
216 info['status'] = 'Aborted'
217 info['reason'] = 'for no reason'
218 info['exit'] = -1
219 self.ganga_finish_time[jobid] = time.time()
220 else:
221 info['status'] = 'Done (Success)'
222 info['exit'] = 0
223 info['reason'] = 'for a reason'
224
225 logger.debug('_status (jobid=%s) -> %s',jobid,repr(info))
226
227
228 return info
229
230
231 - def status(self,jobids, is_collection=False):
232 '''Query the status of jobs on the grid.
233 If is_collection is False then jobids is a list of non-split jobs or emulated bulk subjobs of a single master job.
234 If is_collection is True then jobids is a list of master jobs which are natively bulk.
235 '''
236
237 logger.debug('job status command: status(jobid=%s,is_collection=%d)',jobids,is_collection)
238
239 info = []
240
241 for id in jobids:
242 if is_collection:
243
244 sleep(config['master_status_time'])
245 info.append(self._status(id,True))
246
247 params = eval(file(self._params_filename(id)).read())
248
249 has_id = time.time() > params['expected_job_id_resolve_time']
250 for sid in params['subjob_ids']:
251 info.append(self._status(sid,has_id))
252 info[-1]['is_node'] = True
253 else:
254 has_id = False
255 info.append(self._status(id,True))
256
257 return info
258
260 '''Fetch the logging info of the given job and save the output in the jobs outputdir'''
261
262 return ""
263
264 - def get_output(self,jobid,directory,wms_proxy=False):
265 '''Retrieve the output of a job on the grid'''
266
267 logger.debug('job get output command: get_output(jobid=%s,directory=%s)', jobid, directory)
268 sleep(config['get_output_time'])
269 self.ganga_finish_time[jobid] = time.time()
270 return (True,None)
271
273 '''Cancel a job'''
274 logger.debug('job cancel command: cancel(jobid=%s)',jobid)
275
276 return self._cancel(jobid)
277
279 '''Expand jdl items'''
280
281 return repr(items)
282
283 expandjdl=staticmethod(expandjdl)
284