Package Ganga :: Package Lib :: Package Localhost :: Module Localhost'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.Localhost.Localhost'

  1  from Ganga.GPIDev.Adapters.IBackend import IBackend 
  2  from Ganga.GPIDev.Schema import * 
  3   
  4  import Ganga.Utility.logging 
  5  logger = Ganga.Utility.logging.getLogger() 
  6   
  7  import Ganga.Utility.Config 
  8  config = Ganga.Utility.Config.makeConfig('Local','parameters of the local backend (jobs in the background on localhost)') 
  9   
 10  config.addOption('remove_workdir', True, 'remove automatically the local working directory when the job completed') 
 11   
 12  import Ganga.Utility.logic, Ganga.Utility.util 
 13   
 14  from Ganga.GPIDev.Lib.File import FileBuffer 
 15   
 16  import os,sys 
 17  import os.path,re,errno 
 18   
 19  import subprocess 
 20   
 21  import datetime 
 22  import time 
 23   
 24  import re 
 25   
26 -class Localhost(IBackend):
27 """Run jobs in the background on local host. 28 29 The job is run in the workdir (usually in /tmp). 30 """ 31 _schema = Schema(Version(1,2), {'nice' : SimpleItem(defvalue=None,typelist=None,doc='*NOT USED*', hidden=1), 32 'id' : SimpleItem(defvalue=-1,protected=1,copyable=0,doc='Process id.'), 33 'status' : SimpleItem(defvalue=None,typelist=None,protected=1,copyable=0,hidden=1,doc='*NOT USED*'), 34 'exitcode' : SimpleItem(defvalue=None,typelist=['int','type(None)'],protected=1,copyable=0,doc='Process exit code.'), 35 'workdir' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Working directory.'), 36 'actualCE' : SimpleItem(defvalue='',protected=1,copyable=0,doc='Hostname where the job was submitted.'), 37 'wrapper_pid' : SimpleItem(defvalue=-1,protected=1,copyable=0,hidden=1,doc='(internal) process id of the execution wrapper'), 38 'nice' : SimpleItem(defvalue=0, doc='adjust process priority using nice -n command') 39 }) 40 _category = 'backends' 41 _name = 'Local' 42 _GUIPrefs = [ { 'attribute' : 'nice', 'widget' : 'String' }, 43 { 'attribute' : 'id', 'widget' : 'Int' }, 44 { 'attribute' : 'status' , 'widget' : 'String' }, 45 { 'attribute' : 'exitcode', 'widget' : 'String' } ] 46 _GUIAdvancedPrefs = [ { 'attribute' : 'nice', 'widget' : 'String' }, 47 { 'attribute' : 'exitcode', 'widget' : 'String' } ] 48 49 50
51 - def __init__(self):
52 super(Localhost,self).__init__()
53
54 - def submit(self,jobconfig,master_input_sandbox):
55 self.run(self.preparejob(jobconfig,master_input_sandbox)) 56 return 1
57
58 - def resubmit(self):
59 job = self.getJobObject() 60 import shutil 61 62 try: 63 shutil.rmtree(self.workdir) 64 except OSError,x: 65 import errno 66 if x.errno != errno.ENOENT: 67 logger.error('problem cleaning the workdir %s, %s',self.workdir,str(x)) 68 return 0 69 try: 70 os.mkdir(self.workdir) 71 except Exception,x: 72 logger.error('cannot make the workdir %s, %s',self.workdir,str(x)) 73 return 0 74 return self.run(job.getInputWorkspace().getPath('__jobscript__'))
75
76 - def run(self,scriptpath):
77 try: 78 process=subprocess.Popen(["python",scriptpath,'subprocess']) 79 except OSError,x: 80 logger.error('cannot start a job process: %s',str(x)) 81 return 0 82 self.wrapper_pid=process.pid 83 self.actualCE = Ganga.Utility.util.hostname() 84 return 1
85
86 - def peek( self, filename = "", command = "" ):
87 """ 88 Allow viewing of output files in job's work directory 89 (i.e. while job is in 'running' state) 90 91 Arguments other than self: 92 filename : name of file to be viewed 93 => Path specified relative to work directory 94 command : command to be used for file viewing 95 96 Return value: None 97 """ 98 job = self.getJobObject() 99 topdir = self.workdir.rstrip( os.sep ) 100 path = os.path.join( topdir, filename ).rstrip( os.sep ) 101 job.viewFile( path = path, command = command ) 102 return None
103
104 - def getStateTime(self, status):
105 """Obtains the timestamps for the 'running', 'completed', and 'failed' states. 106 107 The __jobstatus__ file in the job's output directory is read to obtain the start and stop times of the job. 108 These are converted into datetime objects and returned to the user. 109 """ 110 j = self.getJobObject() 111 end_list = ['completed', 'failed'] 112 d = {} 113 checkstr='' 114 115 if status == 'running': checkstr='START:' 116 elif status == 'completed': checkstr='STOP:' 117 elif status == 'failed': checkstr='FAILED:' 118 else: 119 checkstr='' 120 121 if checkstr=='': 122 logger.debug("In getStateTime(): checkstr == ''") 123 return None 124 125 try: 126 p = os.path.join(j.outputdir, '__jobstatus__') 127 logger.debug("Opening output file at: %s", p) 128 f = open(p) 129 except IOError: 130 logger.debug('unable to open file %s', p) 131 return None 132 133 for l in f.readlines(): 134 if checkstr in l: 135 pos=l.find(checkstr) 136 timestr=l[pos+len(checkstr)+1:pos+len(checkstr)+25] 137 try: 138 t = datetime.datetime(*(time.strptime(timestr, "%a %b %d %H:%M:%S %Y")[0:6])) 139 except ValueError: 140 logger.debug("Value Error in file: '%s': string does not match required format.", p) 141 return None 142 return t 143 144 logger.debug("Reached the end of getStateTime('%s'). Returning None.", status) 145 return None
146
147 - def timedetails(self):
148 """Return all available timestamps from this backend. 149 """ 150 j = self.getJobObject() 151 try: ## check for file. if it's not there don't bother calling getSateTime (twice!) 152 p = os.path.join(j.outputdir, '__jobstatus__') 153 logger.debug("Opening output file at: %s", p) 154 f = open(p) 155 f.close() 156 except IOError: 157 logger.error('unable to open file %s', p) 158 return None 159 del f 160 r = self.getStateTime('running') 161 c = self.getStateTime('completed') 162 d = {'START' : r, 'STOP' : c} 163 164 return d
165
166 - def preparejob(self,jobconfig,master_input_sandbox):
167 168 job = self.getJobObject() 169 mon = job.getMonitoringService() 170 import Ganga.Core.Sandbox as Sandbox 171 subjob_input_sandbox = job.createPackedInputSandbox(jobconfig.getSandboxFiles() 172 + Sandbox.getGangaModulesAsSandboxFiles(Sandbox.getDefaultModules()) 173 + Sandbox.getGangaModulesAsSandboxFiles(mon.getSandboxModules())) 174 175 appscriptpath = [jobconfig.getExeString()]+jobconfig.getArgStrings() 176 if self.nice: 177 appscriptpath = ['nice','-n %d'%self.nice] + appscriptpath 178 if self.nice < 0: 179 logger.warning('increasing process priority is often not allowed, your job may fail due to this') 180 181 sharedoutputpath=job.getOutputWorkspace().getPath() 182 outputpatterns=jobconfig.outputbox 183 environment=jobconfig.env 184 185 from Ganga.Utility import tempfile 186 workdir = tempfile.mkdtemp() 187 188 script= """#!/usr/bin/env python 189 190 import os,os.path,shutil,tempfile 191 import sys,time 192 193 import sys 194 195 # FIXME: print as DEBUG: to __syslog__ file 196 #print sys.path 197 #print os.environ['PATH'] 198 #print sys.version 199 200 # bugfix #13314 : make sure that the wrapper (spawned process) is detached from Ganga session 201 # the process will not receive Control-C signals 202 # using fork and doing setsid() before exec would probably be a bit 203 # better (to avoid slim chance that the signal is propagated to this 204 # process before setsid is reached) 205 # this is only enabled if the first argument is 'subprocess' in order to enable 206 # running this script by hand from outside ganga (which is sometimes useful) 207 if len(sys.argv)>1 and sys.argv[1] == 'subprocess': 208 os.setsid() 209 210 ############################################################################################ 211 212 ###INLINEMODULES### 213 214 ############################################################################################ 215 216 input_sandbox = ###INPUT_SANDBOX### 217 sharedoutputpath= ###SHAREDOUTPUTPATH### 218 outputpatterns = ###OUTPUTPATTERNS### 219 appscriptpath = ###APPSCRIPTPATH### 220 environment = ###ENVIRONMENT### 221 workdir = ###WORKDIR### 222 223 statusfilename = os.path.join(sharedoutputpath,'__jobstatus__') 224 225 try: 226 statusfile=file(statusfilename,'w') 227 except IOError,x: 228 print 'ERROR: not able to write a status file: ', statusfilename 229 print 'ERROR: ',x 230 raise 231 232 line='START: '+ time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) + os.linesep 233 statusfile.writelines(line) 234 statusfile.flush() 235 236 os.chdir(workdir) 237 238 # -- WARNING: get the input files including the python modules BEFORE sys.path.insert() 239 # -- SINCE PYTHON 2.6 THERE WAS A SUBTLE CHANGE OF SEMANTICS IN THIS AREA 240 241 for f in input_sandbox: 242 getPackedInputSandbox(f) 243 244 # -- END OF MOVED CODE BLOCK 245 246 import sys 247 sys.path.insert(0, ###GANGADIR###) 248 sys.path.insert(0,os.path.join(os.getcwd(),PYTHON_DIR)) 249 250 try: 251 import subprocess 252 except ImportError,x: 253 sys.path.insert(0,###SUBPROCESS_PYTHONPATH###) 254 import subprocess 255 try: 256 import tarfile 257 except ImportError,x: 258 sys.path.insert(0,###TARFILE_PYTHONPATH###) 259 import tarfile 260 261 262 for key,value in environment.iteritems(): 263 os.environ[key] = value 264 265 outfile=file('stdout','w') 266 errorfile=file('stderr','w') 267 268 sys.stdout=file('./__syslog__','w') 269 sys.stderr=sys.stdout 270 271 ###MONITORING_SERVICE### 272 monitor = createMonitoringObject() 273 monitor.start() 274 275 import subprocess 276 277 import time #datetime #disabled for python2.2 compatiblity 278 279 try: 280 child = subprocess.Popen(appscriptpath, shell=False, stdout=outfile, stderr=errorfile) 281 except OSError,x: 282 file('tt','w').close() 283 print >> statusfile, 'EXITCODE: %d'%-9999 284 print >> statusfile, 'FAILED: %s'%time.strftime('%a %b %d %H:%M:%S %Y') #datetime.datetime.utcnow().strftime('%a %b %d %H:%M:%S %Y') 285 print >> statusfile, 'PROBLEM STARTING THE APPLICATION SCRIPT: %s %s'%(appscriptpath,str(x)) 286 statusfile.close() 287 sys.exit() 288 289 print >> statusfile, 'PID: %d'%child.pid 290 statusfile.flush() 291 292 result = -1 293 294 try: 295 while 1: 296 result = child.poll() 297 if result is not None: 298 break 299 outfile.flush() 300 errorfile.flush() 301 monitor.progress() 302 time.sleep(0.3) 303 finally: 304 monitor.progress() 305 sys.stdout=sys.__stdout__ 306 sys.stderr=sys.__stderr__ 307 308 monitor.stop(result) 309 310 outfile.flush() 311 errorfile.flush() 312 313 createOutputSandbox(outputpatterns,None,sharedoutputpath) 314 315 outfile.close() 316 errorfile.close() 317 318 from Ganga.Utility.files import recursive_copy 319 320 for fn in ['stdout','stderr','__syslog__']: 321 try: 322 recursive_copy(fn,sharedoutputpath) 323 except Exception,x: 324 print 'ERROR: (job'+###JOBID###+')',x 325 326 line="EXITCODE: " + repr(result) + os.linesep 327 line+='STOP: '+time.strftime('%a %b %d %H:%M:%S %Y',time.gmtime(time.time())) + os.linesep 328 statusfile.writelines(line) 329 sys.exit() 330 331 """ 332 333 import inspect 334 script = script.replace('###INLINEMODULES###',inspect.getsource(Sandbox.WNSandbox)) 335 336 script = script.replace('###APPLICATION_NAME###',repr(job.application._name)) 337 script = script.replace('###INPUT_SANDBOX###',repr(subjob_input_sandbox+master_input_sandbox)) 338 script = script.replace('###SHAREDOUTPUTPATH###',repr(sharedoutputpath)) 339 script = script.replace('###APPSCRIPTPATH###',repr(appscriptpath)) 340 script = script.replace('###OUTPUTPATTERNS###',repr(outputpatterns)) 341 script = script.replace('###JOBID###',repr(job.getFQID('.'))) 342 script = script.replace('###ENVIRONMENT###',repr(environment)) 343 script = script.replace('###WORKDIR###',repr(workdir)) 344 345 script = script.replace('###MONITORING_SERVICE###',job.getMonitoringService().getWrapperScriptConstructorText()) 346 347 self.workdir = workdir 348 349 from Ganga.Utility.Config import getConfig 350 351 script = script.replace('###GANGADIR###',repr(getConfig('System')['GANGA_PYTHONPATH'])) 352 353 import Ganga.PACKAGE 354 script = script.replace('###SUBPROCESS_PYTHONPATH###',repr(Ganga.PACKAGE.setup.getPackagePath2('subprocess','syspath',force=True))) 355 script = script.replace('###TARFILE_PYTHONPATH###',repr(Ganga.PACKAGE.setup.getPackagePath2('tarfile','syspath',force=True))) 356 357 return job.getInputWorkspace().writefile(FileBuffer('__jobscript__',script),executable=1)
358
359 - def kill(self):
360 import os,signal 361 362 job = self.getJobObject() 363 364 ok = True 365 try: 366 # kill the wrapper script 367 # bugfix: #18178 - since wrapper script sets a new session and new group, we can use this to kill all processes in the group 368 os.kill(-self.wrapper_pid,signal.SIGKILL) 369 except OSError,x: 370 logger.warning('while killing wrapper script for job %d: pid=%d, %s',job.id,self.wrapper_pid,str(x)) 371 ok = False 372 373 # waitpid to avoid zombies 374 try: 375 ws = os.waitpid(self.wrapper_pid,0) 376 except OSError,x: 377 logger.warning('problem while waitpid %d: %s',job.id,x) 378 379 from Ganga.Utility.files import recursive_copy 380 381 for fn in ['stdout','stderr','__syslog__']: 382 try: 383 recursive_copy(os.path.join(self.workdir,fn),job.getOutputWorkspace().getPath()) 384 except Exception,x: 385 logger.info('problem retrieving %s: %s',fn,x) 386 387 self.remove_workdir() 388 return 1
389
390 - def remove_workdir(self):
391 if config['remove_workdir']: 392 import shutil 393 try: 394 shutil.rmtree(self.workdir) 395 except OSError,x: 396 logger.warning('problem removing the workdir %s: %s',str(self.id),str(x))
397
399 400 def get_exit_code(f): 401 import re 402 statusfile=file(f) 403 stat = statusfile.read() 404 m = re.compile(r'^EXITCODE: (?P<exitcode>-?\d*)',re.M).search(stat) 405 406 if m is None: 407 return None 408 else: 409 return int(m.group('exitcode'))
410 411 def get_pid(f): 412 import re 413 statusfile=file(f) 414 stat = statusfile.read() 415 m = re.compile(r'^PID: (?P<pid>\d*)',re.M).search(stat) 416 417 if m is None: 418 return None 419 else: 420 return int(m.group('pid'))
421 422 logger.debug('local ping: %s',str(jobs)) 423 424 for j in jobs: 425 outw=j.getOutputWorkspace() 426 427 # try to get the application exit code from the status file 428 try: 429 statusfile = os.path.join(outw.getPath(),'__jobstatus__') 430 if j.status == 'submitted': 431 pid = get_pid(statusfile) 432 if pid: 433 j.backend.id = pid 434 #logger.info('Local job %d status changed to running, pid=%d',j.id,pid) 435 j.updateStatus('running') # bugfix: 12194 436 exitcode = get_exit_code(statusfile) 437 logger.debug('status file: %s %s',statusfile,file(statusfile).read()) 438 except IOError,x: 439 logger.debug('problem reading status file: %s (%s)',statusfile,str(x)) 440 exitcode=None 441 except Exception,x: 442 logger.critical('problem during monitoring: %s',str(x)) 443 import traceback 444 traceback.print_exc() 445 raise x 446 447 # check if the exit code of the wrapper script is available (non-blocking check) 448 # if the wrapper script exited with non zero this is an error 449 try: 450 ws = os.waitpid(j.backend.wrapper_pid,os.WNOHANG) 451 if not Ganga.Utility.logic.implies(ws[0]!=0,ws[1]==0): 452 #FIXME: for some strange reason the logger DOES NOT LOG (checked in python 2.3 and 2.5) 453 ##print 'logger problem', logger.name 454 ##print 'logger',logger.getEffectiveLevel() 455 logger.critical('wrapper script for job %s exit with code %d',str(j.id),ws[1]) 456 logger.critical('report this as a bug at http://savannah.cern.ch/bugs/?group=ganga') 457 j.updateStatus('failed') 458 except OSError,x: 459 if x.errno != errno.ECHILD: 460 logger.warning('cannot do waitpid for %d: %s',j.backend.wrapper_pid,str(x)) 461 462 # if the exit code was collected for the application get the exit code back 463 464 if not exitcode is None: 465 # status file indicates that the application finished 466 j.backend.exitcode = exitcode 467 468 if exitcode == 0: 469 j.updateStatus('completed') 470 else: 471 j.updateStatus('failed') 472 473 #logger.info('Local job %d finished with exitcode %d',j.id,exitcode) 474 475 ##if j.outputdata: 476 ## j.outputdata.fill() 477 478 j.backend.remove_workdir() 479 480 481 updateMonitoringInformation = staticmethod(updateMonitoringInformation) 482