1 from Ganga.GPIDev.Adapters.IBackend import IBackend
2 from Ganga.GPIDev.Schema import *
3
4 import Ganga.Utility.logging
5 logger = Ganga.Utility.logging.getLogger()
6
7 import Ganga.Utility.Config
8 config = Ganga.Utility.Config.makeConfig('Local','parameters of the local backend (jobs in the background on localhost)')
9
10 config.addOption('remove_workdir', True, 'remove automatically the local working directory when the job completed')
11
12 import Ganga.Utility.logic, Ganga.Utility.util
13
14 from Ganga.GPIDev.Lib.File import FileBuffer
15
16 import os,sys
17 import os.path,re,errno
18
19 import subprocess
20
21 import datetime
22 import time
23
24 import re
25
27 """Run jobs in the background on local host.
28
29 The job is run in the workdir (usually in /tmp).
30 """
31 _schema = Schema(Version(1,2), {'nice' : SimpleItem(defvalue=None,typelist=None,doc='*NOT USED*', hidden=1),
32 'id' : SimpleItem(defvalue=-1,protected=1,copyable=0,doc='Process id.'),
33 'status' : SimpleItem(defvalue=None,typelist=None,protected=1,copyable=0,hidden=1,doc='*NOT USED*'),
34 'exitcode' : SimpleItem(defvalue=None,typelist=['int','type(None)'],protected=1,copyable=0,doc='Process exit code.'),
35 'workdir' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Working directory.'),
36 'actualCE' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Hostname where the job was submitted.'),
37 'wrapper_pid' : SimpleItem(defvalue=-1,protected=1,copyable=0,hidden=1,doc='(internal) process id of the execution wrapper'),
38 'nice' : SimpleItem(defvalue=0, doc='adjust process priority using nice -n command')
39 })
40 _category = 'backends'
41 _name = 'Local'
42 _GUIPrefs = [ { 'attribute' : 'nice', 'widget' : 'String' },
43 { 'attribute' : 'id', 'widget' : 'Int' },
44 { 'attribute' : 'status' , 'widget' : 'String' },
45 { 'attribute' : 'exitcode', 'widget' : 'String' } ]
46 _GUIAdvancedPrefs = [ { 'attribute' : 'nice', 'widget' : 'String' },
47 { 'attribute' : 'exitcode', 'widget' : 'String' } ]
48
49
50
53
54 - def submit(self,jobconfig,master_input_sandbox):
55 self.run(self.preparejob(jobconfig,master_input_sandbox))
56 return 1
57
75
76 - def run(self,scriptpath):
85
86 - def peek( self, filename = "", command = "" ):
87 """
88 Allow viewing of output files in job's work directory
89 (i.e. while job is in 'running' state)
90
91 Arguments other than self:
92 filename : name of file to be viewed
93 => Path specified relative to work directory
94 command : command to be used for file viewing
95
96 Return value: None
97 """
98 job = self.getJobObject()
99 topdir = self.workdir.rstrip( os.sep )
100 path = os.path.join( topdir, filename ).rstrip( os.sep )
101 job.viewFile( path = path, command = command )
102 return None
103
105 """Obtains the timestamps for the 'running', 'completed', and 'failed' states.
106
107 The __jobstatus__ file in the job's output directory is read to obtain the start and stop times of the job.
108 These are converted into datetime objects and returned to the user.
109 """
110 j = self.getJobObject()
111 end_list = ['completed', 'failed']
112 d = {}
113 checkstr=''
114
115 if status == 'running': checkstr='START:'
116 elif status == 'completed': checkstr='STOP:'
117 elif status == 'failed': checkstr='FAILED:'
118 else:
119 checkstr=''
120
121 if checkstr=='':
122 logger.debug("In getStateTime(): checkstr == ''")
123 return None
124
125 try:
126 p = os.path.join(j.outputdir, '__jobstatus__')
127 logger.debug("Opening output file at: %s", p)
128 f = open(p)
129 except IOError:
130 logger.debug('unable to open file %s', p)
131 return None
132
133 for l in f.readlines():
134 if checkstr in l:
135 pos=l.find(checkstr)
136 timestr=l[pos+len(checkstr)+1:pos+len(checkstr)+25]
137 try:
138 t = datetime.datetime(*(time.strptime(timestr, "%a %b %d %H:%M:%S %Y")[0:6]))
139 except ValueError:
140 logger.debug("Value Error in file: '%s': string does not match required format.", p)
141 return None
142 return t
143
144 logger.debug("Reached the end of getStateTime('%s'). Returning None.", status)
145 return None
146
165
166 - def preparejob(self,jobconfig,master_input_sandbox):
167
168 job = self.getJobObject()
169 mon = job.getMonitoringService()
170 import Ganga.Core.Sandbox as Sandbox
171 subjob_input_sandbox = job.createPackedInputSandbox(jobconfig.getSandboxFiles()
172 + Sandbox.getGangaModulesAsSandboxFiles(Sandbox.getDefaultModules())
173 + Sandbox.getGangaModulesAsSandboxFiles(mon.getSandboxModules()))
174
175 appscriptpath = [jobconfig.getExeString()]+jobconfig.getArgStrings()
176 if self.nice:
177 appscriptpath = ['nice','-n %d'%self.nice] + appscriptpath
178 if self.nice < 0:
179 logger.warning('increasing process priority is often not allowed, your job may fail due to this')
180
181 sharedoutputpath=job.getOutputWorkspace().getPath()
182 outputpatterns=jobconfig.outputbox
183 environment=jobconfig.env
184
185 from Ganga.Utility import tempfile
186 workdir = tempfile.mkdtemp()
187
188 script= """#!/usr/bin/env python
189
190 import os,os.path,shutil,tempfile
191 import sys,time
192
193 import sys
194
195 # FIXME: print as DEBUG: to __syslog__ file
196 #print sys.path
197 #print os.environ['PATH']
198 #print sys.version
199
200 # bugfix #13314 : make sure that the wrapper (spawned process) is detached from Ganga session
201 # the process will not receive Control-C signals
202 # using fork and doing setsid() before exec would probably be a bit
203 # better (to avoid slim chance that the signal is propagated to this
204 # process before setsid is reached)
205 # this is only enabled if the first argument is 'subprocess' in order to enable
206 # running this script by hand from outside ganga (which is sometimes useful)
207 if len(sys.argv)>1 and sys.argv[1] == 'subprocess':
208 os.setsid()
209
210 ############################################################################################
211
212 ###INLINEMODULES###
213
214 ############################################################################################
215
216 input_sandbox = ###INPUT_SANDBOX###
217 sharedoutputpath= ###SHAREDOUTPUTPATH###
218 outputpatterns = ###OUTPUTPATTERNS###
219 appscriptpath = ###APPSCRIPTPATH###
220 environment = ###ENVIRONMENT###
221 workdir = ###WORKDIR###
222
223 statusfilename = os.path.join(sharedoutputpath,'__jobstatus__')
224
225 try:
226 statusfile=file(statusfilename,'w')
227 except IOError,x:
228 print 'ERROR: not able to write a status file: ', statusfilename
229 print 'ERROR: ',x
230 raise
231
232 line='START: '+ time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) + os.linesep
233 statusfile.writelines(line)
234 statusfile.flush()
235
236 os.chdir(workdir)
237
238 # -- WARNING: get the input files including the python modules BEFORE sys.path.insert()
239 # -- SINCE PYTHON 2.6 THERE WAS A SUBTLE CHANGE OF SEMANTICS IN THIS AREA
240
241 for f in input_sandbox:
242 getPackedInputSandbox(f)
243
244 # -- END OF MOVED CODE BLOCK
245
246 import sys
247 sys.path.insert(0, ###GANGADIR###)
248 sys.path.insert(0,os.path.join(os.getcwd(),PYTHON_DIR))
249
250 try:
251 import subprocess
252 except ImportError,x:
253 sys.path.insert(0,###SUBPROCESS_PYTHONPATH###)
254 import subprocess
255 try:
256 import tarfile
257 except ImportError,x:
258 sys.path.insert(0,###TARFILE_PYTHONPATH###)
259 import tarfile
260
261
262 for key,value in environment.iteritems():
263 os.environ[key] = value
264
265 outfile=file('stdout','w')
266 errorfile=file('stderr','w')
267
268 sys.stdout=file('./__syslog__','w')
269 sys.stderr=sys.stdout
270
271 ###MONITORING_SERVICE###
272 monitor = createMonitoringObject()
273 monitor.start()
274
275 import subprocess
276
277 import time #datetime #disabled for python2.2 compatiblity
278
279 try:
280 child = subprocess.Popen(appscriptpath, shell=False, stdout=outfile, stderr=errorfile)
281 except OSError,x:
282 file('tt','w').close()
283 print >> statusfile, 'EXITCODE: %d'%-9999
284 print >> statusfile, 'FAILED: %s'%time.strftime('%a %b %d %H:%M:%S %Y') #datetime.datetime.utcnow().strftime('%a %b %d %H:%M:%S %Y')
285 print >> statusfile, 'PROBLEM STARTING THE APPLICATION SCRIPT: %s %s'%(appscriptpath,str(x))
286 statusfile.close()
287 sys.exit()
288
289 print >> statusfile, 'PID: %d'%child.pid
290 statusfile.flush()
291
292 result = -1
293
294 try:
295 while 1:
296 result = child.poll()
297 if result is not None:
298 break
299 outfile.flush()
300 errorfile.flush()
301 monitor.progress()
302 time.sleep(0.3)
303 finally:
304 monitor.progress()
305 sys.stdout=sys.__stdout__
306 sys.stderr=sys.__stderr__
307
308 monitor.stop(result)
309
310 outfile.flush()
311 errorfile.flush()
312
313 createOutputSandbox(outputpatterns,None,sharedoutputpath)
314
315 outfile.close()
316 errorfile.close()
317
318 from Ganga.Utility.files import recursive_copy
319
320 for fn in ['stdout','stderr','__syslog__']:
321 try:
322 recursive_copy(fn,sharedoutputpath)
323 except Exception,x:
324 print 'ERROR: (job'+###JOBID###+')',x
325
326 line="EXITCODE: " + repr(result) + os.linesep
327 line+='STOP: '+time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) + os.linesep
328 statusfile.writelines(line)
329 sys.exit()
330
331 """
332
333 import inspect
334 script = script.replace('###INLINEMODULES###',inspect.getsource(Sandbox.WNSandbox))
335
336 script = script.replace('###APPLICATION_NAME###',repr(job.application._name))
337 script = script.replace('###INPUT_SANDBOX###',repr(subjob_input_sandbox+master_input_sandbox))
338 script = script.replace('###SHAREDOUTPUTPATH###',repr(sharedoutputpath))
339 script = script.replace('###APPSCRIPTPATH###',repr(appscriptpath))
340 script = script.replace('###OUTPUTPATTERNS###',repr(outputpatterns))
341 script = script.replace('###JOBID###',repr(job.getFQID('.')))
342 script = script.replace('###ENVIRONMENT###',repr(environment))
343 script = script.replace('###WORKDIR###',repr(workdir))
344
345 script = script.replace('###MONITORING_SERVICE###',job.getMonitoringService().getWrapperScriptConstructorText())
346
347 self.workdir = workdir
348
349 from Ganga.Utility.Config import getConfig
350
351 script = script.replace('###GANGADIR###',repr(getConfig('System')['GANGA_PYTHONPATH']))
352
353 import Ganga.PACKAGE
354 script = script.replace('###SUBPROCESS_PYTHONPATH###',repr(Ganga.PACKAGE.setup.getPackagePath2('subprocess','syspath',force=True)))
355 script = script.replace('###TARFILE_PYTHONPATH###',repr(Ganga.PACKAGE.setup.getPackagePath2('tarfile','syspath',force=True)))
356
357 return job.getInputWorkspace().writefile(FileBuffer('__jobscript__',script),executable=1)
358
360 import os,signal
361
362 job = self.getJobObject()
363
364 ok = True
365 try:
366
367
368 os.kill(-self.wrapper_pid,signal.SIGKILL)
369 except OSError,x:
370 logger.warning('while killing wrapper script for job %d: pid=%d, %s',job.id,self.wrapper_pid,str(x))
371 ok = False
372
373
374 try:
375 ws = os.waitpid(self.wrapper_pid,0)
376 except OSError,x:
377 logger.warning('problem while waitpid %d: %s',job.id,x)
378
379 from Ganga.Utility.files import recursive_copy
380
381 for fn in ['stdout','stderr','__syslog__']:
382 try:
383 recursive_copy(os.path.join(self.workdir,fn),job.getOutputWorkspace().getPath())
384 except Exception,x:
385 logger.info('problem retrieving %s: %s',fn,x)
386
387 self.remove_workdir()
388 return 1
389
391 if config['remove_workdir']:
392 import shutil
393 try:
394 shutil.rmtree(self.workdir)
395 except OSError,x:
396 logger.warning('problem removing the workdir %s: %s',str(self.id),str(x))
397
410
411 def get_pid(f):
412 import re
413 statusfile=file(f)
414 stat = statusfile.read()
415 m = re.compile(r'^PID: (?P<pid>\d*)',re.M).search(stat)
416
417 if m is None:
418 return None
419 else:
420 return int(m.group('pid'))
421
422 logger.debug('local ping: %s',str(jobs))
423
424 for j in jobs:
425 outw=j.getOutputWorkspace()
426
427
428 try:
429 statusfile = os.path.join(outw.getPath(),'__jobstatus__')
430 if j.status == 'submitted':
431 pid = get_pid(statusfile)
432 if pid:
433 j.backend.id = pid
434
435 j.updateStatus('running')
436 exitcode = get_exit_code(statusfile)
437 logger.debug('status file: %s %s',statusfile,file(statusfile).read())
438 except IOError,x:
439 logger.debug('problem reading status file: %s (%s)',statusfile,str(x))
440 exitcode=None
441 except Exception,x:
442 logger.critical('problem during monitoring: %s',str(x))
443 import traceback
444 traceback.print_exc()
445 raise x
446
447
448
449 try:
450 ws = os.waitpid(j.backend.wrapper_pid,os.WNOHANG)
451 if not Ganga.Utility.logic.implies(ws[0]!=0,ws[1]==0):
452
453
454
455 logger.critical('wrapper script for job %s exit with code %d',str(j.id),ws[1])
456 logger.critical('report this as a bug at http://savannah.cern.ch/bugs/?group=ganga')
457 j.updateStatus('failed')
458 except OSError,x:
459 if x.errno != errno.ECHILD:
460 logger.warning('cannot do waitpid for %d: %s',j.backend.wrapper_pid,str(x))
461
462
463
464 if not exitcode is None:
465
466 j.backend.exitcode = exitcode
467
468 if exitcode == 0:
469 j.updateStatus('completed')
470 else:
471 j.updateStatus('failed')
472
473
474
475
476
477
478 j.backend.remove_workdir()
479
480
481 updateMonitoringInformation = staticmethod(updateMonitoringInformation)
482