Package Ganga :: Package Lib :: Package LCG :: Package GridSimulator :: Module GridSimulator
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.LCG.GridSimulator.GridSimulator

  1  ## from Ganga.Utility.Config import makeConfig, ConfigError 
  2  from Ganga.Utility.Config import getConfig, ConfigError 
  3  from Ganga.Utility.logging import getLogger 
  4  from Ganga.Utility.util import isStringLike 
  5   
  6  from Ganga.Utility.GridShell import getShell  
  7  from Ganga.Lib.LCG.GridCache import GridCache 
  8   
  9  from Ganga.GPIDev.Credentials.ICredential import ICredential 
 10   
 11  logger = getLogger() 
 12   
 13  logger.critical('LCG Grid Simulator ENABLED') 
 14   
 15  ######################################################################################## 
 16  ## GRID SIMULATOR 
 17  ######################################################################################## 
 18   
 19  config = makeConfig('GridSimulator','Grid Simulator configuration parameters') 
 20   
 21  config.addOption('submit_time','random.uniform(1,10)','python expression which returns the time it takes (in seconds) to complete the Grid.submit() command (also for subjob in bulk emulation)') 
 22  config.addOption('submit_failure_rate',0.0,'probability that the Grid.submit() method fails') 
 23   
 24  config.addOption('cancel_time','random.uniform(1,5)','python expression which returns the time it takes (in seconds) to complete the Grid.cancel() command (also for subjob in bulk emulation)') 
 25  config.addOption('cancel_failure_rate',0.0,'probability that the Grid.cancel() method fails') 
 26   
 27  config.addOption('status_time','random.uniform(1,5)','python expression which returns the time it takes (in seconds) to complete the status command (also for subjob in bulk emulation)') 
 28   
 29  config.addOption('get_output_time','random.uniform(1,5)','python expression which returns the time it takes (in seconds) to complete the get_output command (also for subjob in bulk emulation)') 
 30   
 31  #config.addOption('bulk_submit_time','random.uniform(1,2)','python expression which returns the time it takes (in seconds) to complete the submission of a single job within the Grid.native_master_submit() command') 
 32  #config.addOption('bulk_submit_failure_rate',0.0,'probabilty that the Grid.native_master_submit() fails') 
 33   
 34  #config.addOption('bulk_cancel_time','random.uniform(1,2)','python expression which returns the time it takes (in seconds) to complete the cancellation of a single job within the Grid.native_master_cancel() command') 
 35  #config.addOption('bulk_cancel_failure_rate',0.0,'probabilty that the Grid.native_master_cancel() fails') 
 36   
 37  config.addOption('job_id_resolved_time','random.uniform(1,2)','python expression which returns the time it takes (in seconds) to complete the resolution of all the id of a subjob (when submitted in bulk) this is the time the NODE_ID becomes available from the monitoring)') 
 38   
 39  #config.addOption('job_scheduled_time','random.uniform(10,20)', 'python expression which returns the time the job stays in the scheduled state') 
 40  #config.addOption('job_running_time','random.uniform(10,20)', 'python expression which returns the time the job stays in the running state') 
 41  config.addOption('job_finish_time', 'random.uniform(10,20)', 'python expression which returns the time when the job enters the Done success or Failed state' ) 
 42  config.addOption('job_failure_rate', 0.0, 'probability of the job to enter the Failed state') 
 43   
44 -def sleep(val):
45 import time 46 time.sleep(get_number(val))
47
48 -def failed(val):
49 t = get_number(val) 50 import random 51 return random.random() < t
52
53 -def get_number(val):
54 import random 55 if type(val) is type(''): 56 t = eval(val,{'random':random}) 57 else: 58 t = val 59 if not type(t) in [type(1.0), type(1)]: 60 #print 'problem with configuration option, invalid value: %s'%val 61 logger.error('problem with configuration option, invalid value: %s',val) 62 return 0 63 #print t 64 return t
65 66 import os 67 import time 68 69 cmd = 'simulation' 70
71 -class GridSimulator:
72 '''Simulator of LCG interactions''' 73 74 middleware = 'GLITE' 75 76 credential = None 77
78 - def __init__(self,middleware='GLITE'):
79 self.active=True 80 self.middleware = middleware.upper() 81 self.credential = ICredential() #FIXME: or the real one 82 #import Ganga.Core.FileWorkspace 83 #basedir = Ganga.Core.FileWorkspace.gettop() 84 #basedir = '/tmp' 85 basedir = '.' 86 self.gridmap_filename = '%s/lcg_simulator_gridmap'%basedir 87 import shelve 88 #map Grid job id into inputdir (where JDL file is) 89 self.jobid_map=shelve.open(self.gridmap_filename,writeback=False) 90 self.jobid_map.setdefault('_job_count',0) 91 92 # here we store the job finish times as seen by ganga 93 self.finished_jobs_filename = '%s/lcg_simulator_finished_jobs'%basedir 94 self.ganga_finish_time = shelve.open(self.finished_jobs_filename,writeback=False) 95 96 self.shell = getShell('GLITE') 97 98 logger.critical('Grid Simulator data files: %s %s',self.gridmap_filename,self.finished_jobs_filename)
99
100 - def check_proxy(self):
101 return True
102
103 - def submit(self,jdlpath,ce=None):
104 '''This method is used for normal and native bulk submission supported by GLITE middleware.''' 105 106 logger.debug('job submit command: submit(jdlpath=%s,ce=%s)',jdlpath,ce) 107 108 jdl = eval(file(jdlpath).read()) 109 110 subjob_ids = [] 111 if jdl['Type'] == 'collection': 112 import re 113 # we need to parse the Nodes attribute string here 114 r = re.compile(r'.*NodeName = "(gsj_\d+)"; file="(\S*)"*') 115 for line in jdl['Nodes'].splitlines()[1:-1]: 116 m = r.match(line) 117 if m: 118 nodename,sjdl_path = m.groups() 119 subjob_ids.append(self._submit(sjdl_path,ce,[],nodename=nodename)) 120 121 masterid = self._submit(jdlpath,ce,subjob_ids) 122 123 return masterid
124
125 - def _params_filename(self,jobid):
126 inputdir = os.path.realpath(self.jobid_map[jobid]) 127 return os.path.join(inputdir,'params')
128
129 - def _submit(self,jdlpath,ce,subjob_ids,nodename=None):
130 '''Submit a JDL file to LCG''' 131 132 logger.debug('job submit command: _submit(jdlpath=%s,ce=%s,subjob_ids=%s)',jdlpath,ce,subjob_ids) 133 134 inputdir = os.path.dirname(os.path.realpath(jdlpath)) 135 136 def write(): 137 file(os.path.join(inputdir,'params'),'w').write(repr(runtime_params))
138 139 runtime_params = {} 140 runtime_params['submission_time_start'] = time.time() 141 142 sleep(config['submit_time']) 143 runtime_params['submission_time_stop'] = time.time() 144 145 if failed(config['submit_failure_rate']): 146 runtime_params['status'] = 'failed_to_submit' 147 write() 148 logger.warning('Job submission failed.') 149 return 150 151 jobid = self._make_new_id() 152 153 self.jobid_map[jobid] = inputdir 154 155 runtime_params['jobid'] = jobid 156 runtime_params['status'] = 'submitted' 157 runtime_params['should_fail'] = failed(config['job_failure_rate']) 158 runtime_params['expected_job_id_resolve_time'] = get_number(config['job_id_resolved_time']) 159 runtime_params['expected_finish_time'] = time.time()+get_number(config['job_finish_time']) 160 runtime_params['subjob_ids'] = subjob_ids 161 runtime_params['nodename'] = nodename 162 write() 163 return jobid
164
165 - def _make_new_id(self):
166 self.jobid_map['_job_count'] += 1 167 jobid = 'https://ganga.simulator.cern.ch/%d'%self.jobid_map['_job_count'] 168 return jobid
169
170 - def _cancel(self,jobid):
171 inputdir = self.jobid_map[jobid] 172 173 sleep(config['cancel_time']) 174 if failed(config['cancel_failure_rate']): 175 file(self._params_filename(jobid),'a').write('\n failed to cancel: %d'%time.time()) 176 return False 177 file(self._params_filename(jobid),'a').write('\ncancelled: %d'%time.time()) 178 return True
179
180 - def native_master_cancel(self,jobid):
181 '''Native bulk cancellation supported by GLITE middleware.''' 182 183 logger.debug('job cancel command: native_master_cancel(jobid=%s',jobid) 184 185 # FIXME: TODO: emulate bulk! 186 return self._cancel(jobid)
187
188 - def _status(self,jobid,has_id):
189 logger.debug('job status command: _status(jobid=%s,has_id=%d)',jobid,has_id) 190 191 info = { 'id' : None, 192 'name' : None, 193 'status' : None, 194 'exit' : None, 195 'reason' : None, 196 'is_node': False, 197 'destination' : 'anywhere' } 198 199 params = eval(file(self._params_filename(jobid)).read()) 200 201 sleep(config['single_status_time']) 202 203 assert params['jobid'] == jobid 204 205 if has_id: 206 info['id'] = params['jobid'] 207 info['name'] = params['nodename'] 208 209 #if is_collection and time.time() > params['expected_job_id_resolve_time']: 210 # info['name'] = 'node_%d' % 0 # FIXME: some number (read from jdl?) 211 212 logger.debug('current_time-expected_finish_time = %d',time.time() - params['expected_finish_time']) 213 214 if time.time() > params['expected_finish_time']: 215 if params['should_fail']: 216 info['status'] = 'Aborted' 217 info['reason'] = 'for no reason' 218 info['exit'] = -1 219 self.ganga_finish_time[jobid] = time.time() 220 else: 221 info['status'] = 'Done (Success)' 222 info['exit'] = 0 223 info['reason'] = 'for a reason' 224 225 logger.debug('_status (jobid=%s) -> %s',jobid,repr(info)) 226 227 #PENDING: handle other statuses: 'Running','Aborted','Cancelled','Done (Exit Code !=0)','Cleared' 228 return info
229 230
231 - def status(self,jobids, is_collection=False):
232 '''Query the status of jobs on the grid. 233 If is_collection is False then jobids is a list of non-split jobs or emulated bulk subjobs of a single master job. 234 If is_collection is True then jobids is a list of master jobs which are natively bulk. 235 ''' 236 237 logger.debug('job status command: status(jobid=%s,is_collection=%d)',jobids,is_collection) 238 239 info = [] 240 241 for id in jobids: 242 if is_collection: 243 #print 'master _status' 244 sleep(config['master_status_time']) 245 info.append(self._status(id,True)) 246 #print 'master _status done' 247 params = eval(file(self._params_filename(id)).read()) 248 #print 'master params',params 249 has_id = time.time() > params['expected_job_id_resolve_time'] 250 for sid in params['subjob_ids']: 251 info.append(self._status(sid,has_id)) 252 info[-1]['is_node'] = True 253 else: 254 has_id = False 255 info.append(self._status(id,True)) 256 257 return info
258
259 - def get_loginfo(self,jobid,directory,verbosity=1):
260 '''Fetch the logging info of the given job and save the output in the jobs outputdir''' 261 262 return ""
263
264 - def get_output(self,jobid,directory,wms_proxy=False):
265 '''Retrieve the output of a job on the grid''' 266 267 logger.debug('job get output command: get_output(jobid=%s,directory=%s)', jobid, directory) 268 sleep(config['get_output_time']) 269 self.ganga_finish_time[jobid] = time.time() 270 return (True,None)
271
272 - def cancel(self,jobid):
273 '''Cancel a job''' 274 logger.debug('job cancel command: cancel(jobid=%s)',jobid) 275 276 return self._cancel(jobid)
277
278 - def expandjdl(items):
279 '''Expand jdl items''' 280 281 return repr(items)
282 283 expandjdl=staticmethod(expandjdl) 284