1
2
3
4
5
6
7 from Ganga.GPIDev.Base import GangaObject
8 from Ganga.GPIDev.Schema import *
9
10 import Ganga.Utility.logging
11 logger = Ganga.Utility.logging.getLogger()
12
13 import os
14
15 import datetime
16 import time
17
19 """
20 Base class for all backend objects.
21
22 Backend classes in Ganga 4.0.x have only the submit() method.
23 This is sufficient to allow emulated bulk submission to function.
24
25 Note that the master_submit() method is always called by the
26 framework unlike the submit() method, which is only called as
27 a part of default implementation of master_submit(). If you
28 provide a special support for bulk submission than you should
29 either call submit() method explicitly in the case of no
30 splitting, or make the implementation of master_submit() handle
31 the non-split jobs as well.
32
33 """
34
35 _schema = Schema(Version(0,0), {})
36 _category = 'backends'
37 _hidden = 1
38
39
40 _packed_input_sandbox = True
41
44
45
46
47
48
49
50
51
52
53
54
55
56
58 """ This is a hook called for each job when Ganga.Core services are
59 started up. The hook is called before the monitoring subsystem is
60 enabled. This hook may be used by some backends to do specialized setup
61 (e.g. to open ssh transport pipes of the Remote backend)
62 """
63 pass
64
65 - def master_submit(self,rjobs,subjobconfigs,masterjobconfig,keep_going=False):
66
67 """ Submit the master job and all its subjobs. The
68 masterjobconfig is shared, individual subjob configs are
69 defined in subjobconfigs. Submission of individual jobs
70 (not-split) also always goes via this method. In that case
71 the subjobconfigs contains just one element - the job itself.
72
73 The default implementation of this method emulates the bulk
74 submission calling a submit() method on individual subjob
75 objects. If submission of any of the subjobs fails then the
76 whole process is aborted with IncompleteSubmissionError
77 exception. The subjobs which have already been submitted stay
78 submitted.
79
80 The default implementation does not process the masterjobconfig.
81 Therefore this method may be overriden in the derived class
82 in the following way:
83
84 def master_submit(self,masterjobconfig,subjobconfigs,keep_going):
85 ...
86 do_some_processsing_of(masterjobconfig)
87 ...
88 return IBackend.master_submit(self,subjobconfigs,masterjobconfig,keep_joing)
89
90
91 Implementation note: we set keep_going to be optional in the
92 signature of IBackend.master_submit() to allow the existing
93 backend implementations which do not support keep_going=True
94 and which at some point may call IBackend.master_submit() to
95 work without change. It may sometimes be non-trivial to enable
96 support for keep_going=True in some backends, even if the
97 finally call IBackend.master_submit(). Therefore it is left to
98 the decision of backend developer to explicitly enable the
99 support for keep_going flag.
100
101 """
102 from Ganga.Core import IncompleteJobSubmissionError, GangaException
103 from Ganga.Utility.logging import log_user_exception
104
105 job = self.getJobObject()
106 assert(implies(rjobs,len(subjobconfigs)==len(rjobs)))
107
108 incomplete = 0
109 incomplete_subjobs = []
110
111 def handleError(x):
112 if keep_going:
113 incomplete_subjobs.append(fqid)
114 return False
115 else:
116 if incomplete:
117 raise x
118 else:
119 return True
120
121 master_input_sandbox=self.master_prepare(masterjobconfig)
122
123 for sc,sj in zip(subjobconfigs,rjobs):
124 fqid = sj.getFQID('.')
125 logger.info("submitting job %s to %s backend",fqid,sj.backend._name)
126 try:
127 b = sj.backend
128 sj.updateStatus('submitting')
129 if b.submit(sc,master_input_sandbox):
130 sj.updateStatus('submitted')
131
132 incomplete = 1
133 else:
134 if handleError(IncompleteJobSubmissionError(fqid,'submission failed')):
135 return 0
136 except Exception,x:
137 sj.updateStatus('new')
138 if isinstance(x,GangaException):
139 logger.error(str(x))
140 log_user_exception(logger,debug = True)
141 else:
142 log_user_exception(logger,debug = False)
143 if handleError(IncompleteJobSubmissionError(fqid,str(x))):
144 return 0
145
146 if incomplete_subjobs:
147 raise IncompleteJobSubmissionError(incomplete_subjobs,'submission failed')
148
149 return 1
150
151 - def submit(self,subjobconfig,master_input_sandbox):
152 """
153 Submit an individual job. Return 1 in case of success.
154
155 master_input_sandbox is a list of file-names which is shared by all subjobs.
156
157 This method is not called directly by the framework. It is
158 only called by the default implementation of master_submit()
159 method.
160
161 Therefore if the implementation of master_submit() is able to
162 cope with submission of individual jobs, then submit() is
163 redundant.
164
165 Transition from Ganga 4.0.x:
166 - subjobconfig is equvalent to jobconfig in the older interface.
167 - jobid is an obsolte parameter which will be removed in the future
168
169 """
170 raise NotImplementedError
171
172
189
190
192 """ A hook in case someone wanted to override the auto
193 resubmission behaviour. Otherwise, it auto_resubmit is
194 equivalent for all practical purposes to a normal resubmit (so
195 it automatcially benefits from bulk resubmit if implemented).
196 """
197 return self.master_resubmit(rjobs)
198
200 """ A hook for the backend to check if this job can be
201 automatically resubmitted.
202 """
203 return True
204
206 """ Resubmit (previously submitted) job. Configuration phase is skipped.
207 Default implementation works is an emulated-bulk operation.
208 If you override this method for bulk optimization then make sure that you call updateMasterJobStatus() on the master job,
209 so the master job will be monitored by the monitoring loop.
210 """
211 from Ganga.Core import IncompleteJobSubmissionError, GangaException
212 from Ganga.Utility.logging import log_user_exception
213 incomplete = 0
214 def handleError(x):
215 if incomplete:
216 raise x
217 else:
218 return 0
219 try:
220 for sj in rjobs:
221 fqid = sj.getFQID('.')
222 logger.info("resubmitting job %s to %s backend",fqid,sj.backend._name)
223 try:
224 b = sj.backend
225 sj.updateStatus('submitting')
226 if backend is None:
227 result = b.resubmit()
228 else:
229 result = b.resubmit(backend=backend)
230 if result:
231 sj.updateStatus('submitted')
232
233 incomplete = 1
234 else:
235 return handleError(IncompleteJobSubmissionError(fqid,'resubmission failed'))
236 except Exception,x:
237 log_user_exception(logger,debug=isinstance(x,GangaException))
238 return handleError(IncompleteJobSubmissionError(fqid,str(x)))
239 finally:
240 master = self.getJobObject().master
241 if master:
242 master.updateMasterJobStatus()
243 return 1
244
246 raise NotImplementedError
247
249 """ Kill a job and all its subjobs. Return 1 in case of success.
250
251 The default implementation uses the kill() method and emulates
252 the bulk operation on all subjobs. It tries to kill as many
253 subjobs as possible even if there are failures. If the
254 operation is incomplete then raise IncompleteKillError().
255 """
256
257 job = self.getJobObject()
258
259 r = True
260
261 if len(job.subjobs):
262 problems = []
263 for s in job.subjobs:
264 if s.status in ['submitted','running']:
265 if not s.backend.kill():
266 r = False
267 problems.append(s.id)
268 if not r:
269 from Ganga.Core import IncompleteKillError
270 raise IncompleteKillError('subjobs %s were not killed'%problems)
271 else:
272 r = job.backend.kill()
273 return r
274
276
277 """ Kill a job (and also all its subjobs). This method is
278 never called by the framework directly. It may only be called
279 by the default implementation of master_kill(). """
280
281 raise NotImplementedError
282
283 - def peek( self, filename = "", command = "" ):
284 """
285 Allow viewing of job output files on the backend
286 (i.e. while job is in 'running' state)
287
288 Arguments other than self:
289 filename : name of file to be viewed
290 => Path specified relative to work directory
291 command : command to be used for file viewing
292
293 Return value: None
294 """
295
296
297
298
299 job = self.getJobObject()
300 job.peek( filename = os.path.join( "..", filename ), command = command )
301 return None
302
304 """When the job is removed then this backend method is called.
305 The primary use-case is the Remote (ssh) backend. """
306 pass
307
309 """Get the timestamps for the job's transitions into the 'running' and 'completed' states.
310 """
311 return None
312
314 """Returns all available backend specific timestamps.
315 """
316 pass
317
318
344
345 master_updateMonitoringInformation = staticmethod(master_updateMonitoringInformation)
346
347
357
358 updateMonitoringInformation = staticmethod(updateMonitoringInformation)
359