Package Ganga :: Package GPIDev :: Package Adapters :: Module IBackend
[hide private]
[frames] | no frames]

Source Code for Module Ganga.GPIDev.Adapters.IBackend

  1  ################################################################################ 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: IBackend.py,v 1.2 2008-10-02 10:31:05 moscicki Exp $ 
  5  ################################################################################ 
  6   
  7  from Ganga.GPIDev.Base import GangaObject 
  8  from Ganga.GPIDev.Schema import * 
  9   
 10  import Ganga.Utility.logging 
 11  logger = Ganga.Utility.logging.getLogger() 
 12   
 13  import os 
 14   
 15  import datetime 
 16  import time 
 17   
18 -class IBackend(GangaObject):
19 """ 20 Base class for all backend objects. 21 22 Backend classes in Ganga 4.0.x have only the submit() method. 23 This is sufficient to allow emulated bulk submission to function. 24 25 Note that the master_submit() method is always called by the 26 framework unlike the submit() method, which is only called as 27 a part of default implementation of master_submit(). If you 28 provide a special support for bulk submission than you should 29 either call submit() method explicitly in the case of no 30 splitting, or make the implementation of master_submit() handle 31 the non-split jobs as well. 32 33 """ 34 35 _schema = Schema(Version(0,0), {}) 36 _category = 'backends' 37 _hidden = 1 38 39 # specify how the default implementation of master_prepare() method creates the input sandbox 40 _packed_input_sandbox = True 41
42 - def __init__(self):
43 super(IBackend,self).__init__()
44 ## import sys 45 ## frame = sys._getframe(1) 46 ## logger = Ganga.Utility.logging.getLogger(frame=frame) 47 ## del frame 48 ## from Ganga.Utility.util import GenericWrapper, wrap_callable_filter 49 ## def before(args,kwargs): 50 ## args[0] = 'hello'+args[0] 51 ## return args,kwargs 52 ## def after(): 53 ## pass 54 ## self.logger = GenericWrapper(logger,before,after, forced = ['info','error','warning','critical', 'debug'], wrapper_function=wrap_callable_filter) 55 56
57 - def setup(self):
58 """ This is a hook called for each job when Ganga.Core services are 59 started up. The hook is called before the monitoring subsystem is 60 enabled. This hook may be used by some backends to do specialized setup 61 (e.g. to open ssh transport pipes of the Remote backend) 62 """ 63 pass
64
65 - def master_submit(self,rjobs,subjobconfigs,masterjobconfig,keep_going=False):
66 67 """ Submit the master job and all its subjobs. The 68 masterjobconfig is shared, individual subjob configs are 69 defined in subjobconfigs. Submission of individual jobs 70 (not-split) also always goes via this method. In that case 71 the subjobconfigs contains just one element - the job itself. 72 73 The default implementation of this method emulates the bulk 74 submission calling a submit() method on individual subjob 75 objects. If submission of any of the subjobs fails then the 76 whole process is aborted with IncompleteSubmissionError 77 exception. The subjobs which have already been submitted stay 78 submitted. 79 80 The default implementation does not process the masterjobconfig. 81 Therefore this method may be overriden in the derived class 82 in the following way: 83 84 def master_submit(self,masterjobconfig,subjobconfigs,keep_going): 85 ... 86 do_some_processsing_of(masterjobconfig) 87 ... 88 return IBackend.master_submit(self,subjobconfigs,masterjobconfig,keep_joing) 89 90 91 Implementation note: we set keep_going to be optional in the 92 signature of IBackend.master_submit() to allow the existing 93 backend implementations which do not support keep_going=True 94 and which at some point may call IBackend.master_submit() to 95 work without change. It may sometimes be non-trivial to enable 96 support for keep_going=True in some backends, even if the 97 finally call IBackend.master_submit(). Therefore it is left to 98 the decision of backend developer to explicitly enable the 99 support for keep_going flag. 100 101 """ 102 from Ganga.Core import IncompleteJobSubmissionError, GangaException 103 from Ganga.Utility.logging import log_user_exception 104 105 job = self.getJobObject() 106 assert(implies(rjobs,len(subjobconfigs)==len(rjobs))) 107 108 incomplete = 0 109 incomplete_subjobs = [] 110 111 def handleError(x): 112 if keep_going: 113 incomplete_subjobs.append(fqid) 114 return False 115 else: 116 if incomplete: 117 raise x 118 else: 119 return True
120 121 master_input_sandbox=self.master_prepare(masterjobconfig) 122 123 for sc,sj in zip(subjobconfigs,rjobs): 124 fqid = sj.getFQID('.') 125 logger.info("submitting job %s to %s backend",fqid,sj.backend._name) 126 try: 127 b = sj.backend 128 sj.updateStatus('submitting') 129 if b.submit(sc,master_input_sandbox): 130 sj.updateStatus('submitted') 131 #sj._commit() # PENDING: TEMPORARY DISABLED 132 incomplete = 1 133 else: 134 if handleError(IncompleteJobSubmissionError(fqid,'submission failed')): 135 return 0 136 except Exception,x: 137 sj.updateStatus('new') 138 if isinstance(x,GangaException): 139 logger.error(str(x)) 140 log_user_exception(logger,debug = True) 141 else: 142 log_user_exception(logger,debug = False) 143 if handleError(IncompleteJobSubmissionError(fqid,str(x))): 144 return 0 145 146 if incomplete_subjobs: 147 raise IncompleteJobSubmissionError(incomplete_subjobs,'submission failed') 148 149 return 1
150
151 - def submit(self,subjobconfig,master_input_sandbox):
152 """ 153 Submit an individual job. Return 1 in case of success. 154 155 master_input_sandbox is a list of file-names which is shared by all subjobs. 156 157 This method is not called directly by the framework. It is 158 only called by the default implementation of master_submit() 159 method. 160 161 Therefore if the implementation of master_submit() is able to 162 cope with submission of individual jobs, then submit() is 163 redundant. 164 165 Transition from Ganga 4.0.x: 166 - subjobconfig is equvalent to jobconfig in the older interface. 167 - jobid is an obsolte parameter which will be removed in the future 168 169 """ 170 raise NotImplementedError
171 172
173 - def master_prepare(self,masterjobconfig):
174 """ Prepare the master job (shared sandbox files). This method is/should be called by master_submit() exactly once. 175 The input sandbox is created according to self._packed_input_sandbox flag (a class attribute) 176 """ 177 178 job = self.getJobObject() 179 files = [] 180 if masterjobconfig: 181 files = masterjobconfig.getSandboxFiles() # FIXME: assume that all jobconfig object have getSandboxFiles() method 182 else: 183 files = self.getJobObject().inputsandbox # RTHandler is not required to produce masterjobconfig, in that case just use the inputsandbox 184 185 if self._packed_input_sandbox: 186 return job.createPackedInputSandbox(files,master=True) 187 else: 188 return job.createInputSandbox(files,master=True)
189 190
191 - def master_auto_resubmit(self,rjobs):
192 """ A hook in case someone wanted to override the auto 193 resubmission behaviour. Otherwise, it auto_resubmit is 194 equivalent for all practical purposes to a normal resubmit (so 195 it automatcially benefits from bulk resubmit if implemented). 196 """ 197 return self.master_resubmit(rjobs)
198
199 - def check_auto_resubmit(self):
200 """ A hook for the backend to check if this job can be 201 automatically resubmitted. 202 """ 203 return True
204
205 - def master_resubmit(self,rjobs,backend=None):
206 """ Resubmit (previously submitted) job. Configuration phase is skipped. 207 Default implementation works is an emulated-bulk operation. 208 If you override this method for bulk optimization then make sure that you call updateMasterJobStatus() on the master job, 209 so the master job will be monitored by the monitoring loop. 210 """ 211 from Ganga.Core import IncompleteJobSubmissionError, GangaException 212 from Ganga.Utility.logging import log_user_exception 213 incomplete = 0 214 def handleError(x): 215 if incomplete: 216 raise x 217 else: 218 return 0
219 try: 220 for sj in rjobs: 221 fqid = sj.getFQID('.') 222 logger.info("resubmitting job %s to %s backend",fqid,sj.backend._name) 223 try: 224 b = sj.backend 225 sj.updateStatus('submitting') 226 if backend is None: 227 result = b.resubmit() 228 else: 229 result = b.resubmit(backend=backend) 230 if result: 231 sj.updateStatus('submitted') 232 #sj._commit() # PENDING: TEMPORARY DISABLED 233 incomplete = 1 234 else: 235 return handleError(IncompleteJobSubmissionError(fqid,'resubmission failed')) 236 except Exception,x: 237 log_user_exception(logger,debug=isinstance(x,GangaException)) 238 return handleError(IncompleteJobSubmissionError(fqid,str(x))) 239 finally: 240 master = self.getJobObject().master 241 if master: 242 master.updateMasterJobStatus() 243 return 1 244
245 - def resubmit(self):
246 raise NotImplementedError
247
248 - def master_kill(self):
249 """ Kill a job and all its subjobs. Return 1 in case of success. 250 251 The default implementation uses the kill() method and emulates 252 the bulk operation on all subjobs. It tries to kill as many 253 subjobs as possible even if there are failures. If the 254 operation is incomplete then raise IncompleteKillError(). 255 """ 256 257 job = self.getJobObject() 258 259 r = True 260 261 if len(job.subjobs): 262 problems = [] 263 for s in job.subjobs: 264 if s.status in ['submitted','running']: 265 if not s.backend.kill(): 266 r = False 267 problems.append(s.id) 268 if not r: 269 from Ganga.Core import IncompleteKillError 270 raise IncompleteKillError('subjobs %s were not killed'%problems) 271 else: 272 r = job.backend.kill() 273 return r
274
275 - def kill(self):
276 277 """ Kill a job (and also all its subjobs). This method is 278 never called by the framework directly. It may only be called 279 by the default implementation of master_kill(). """ 280 281 raise NotImplementedError
282
283 - def peek( self, filename = "", command = "" ):
284 """ 285 Allow viewing of job output files on the backend 286 (i.e. while job is in 'running' state) 287 288 Arguments other than self: 289 filename : name of file to be viewed 290 => Path specified relative to work directory 291 command : command to be used for file viewing 292 293 Return value: None 294 """ 295 296 # This is a dummy implementation - it only provides access 297 # to files in the job's output directory 298 299 job = self.getJobObject() 300 job.peek( filename = os.path.join( "..", filename ), command = command ) 301 return None
302
303 - def remove(self):
304 """When the job is removed then this backend method is called. 305 The primary use-case is the Remote (ssh) backend. """ 306 pass
307
308 - def getStateTime(self, status):
309 """Get the timestamps for the job's transitions into the 'running' and 'completed' states. 310 """ 311 return None
312
313 - def timedetails(self):
314 """Returns all available backend specific timestamps. 315 """ 316 pass
317 318
319 - def master_updateMonitoringInformation(jobs):
320 321 """ Update monitoring information for jobs: jobs is a list of 322 jobs in this backend which require monitoring (either 323 'submitted' or 'running' state). The jobs list never contains 324 the subjobs. 325 326 The default implementation iterates over subjobs and calls 327 updateMonitoringInformation(). 328 """ 329 330 simple_jobs = [] 331 332 for j in jobs: 333 if len(j.subjobs): 334 monitorable_subjobs = [s for s in j.subjobs if s.status in ['submitted','running'] ] 335 logger.debug('Monitoring subjobs: %s',repr([jj._repr() for jj in monitorable_subjobs])) 336 j.backend.updateMonitoringInformation(monitorable_subjobs) 337 j.updateMasterJobStatus() 338 else: 339 simple_jobs.append(j) 340 341 if simple_jobs: 342 logger.debug('Monitoring jobs: %s',repr([jj._repr() for jj in simple_jobs])) 343 simple_jobs[0].backend.updateMonitoringInformation(simple_jobs)
344 345 master_updateMonitoringInformation = staticmethod(master_updateMonitoringInformation) 346 347
348 - def updateMonitoringInformation(jobs):
349 """ Update monitoring information for individual jobs: jobs is 350 a list which may contain subjobs as well as the non-split 351 jobs. This method is never called by the framework directly. 352 It may only be called by the default implementation of 353 master_updateMonitoringInformation(). 354 """ 355 356 raise NotImplementedError
357 358 updateMonitoringInformation = staticmethod(updateMonitoringInformation) 359