Package Ganga :: Package Lib :: Package Interactive :: Module Interactive'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.Interactive.Interactive'

  1  ############################################################################### 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: Interactive.py,v 1.1 2008-07-17 16:40:57 moscicki Exp $ 
  5  ############################################################################### 
  6  # File: Interactive.py 
  7  # Author: K. Harrison 
  8  # Created: 060720 
  9  # Version 1.0: 060728 
 10  # 
 11  # KH 060803 - Corrected _getJobObject to getJobObject 
 12  # 
 13  # KH 060829 - Updated to use Sandbox module 
 14  # 
 15  # KH 060901 - Updates in submit and preparejob methods, for core changes 
 16  # 
 17  # KH 061103 - Corrected to take into account master input sandbox 
 18  # 
 19  # KH 080306 - Corrections from VR 
 20   
 21  """Module containing class for running jobs interactively""" 
 22                                                                                   
 23  __author__  = "K.Harrison <Harrison@hep.phy.cam.ac.uk>" 
 24  __date__    = "6 February 2008" 
 25  __version__ = "1.4" 
 26   
 27  from Ganga.Core import Sandbox 
 28  from Ganga.GPIDev.Adapters.IBackend import IBackend 
 29  from Ganga.GPIDev.Lib.File import FileBuffer 
 30  from Ganga.GPIDev.Schema import ComponentItem, Schema, SimpleItem, Version 
 31  from Ganga.Utility import logging, tempfile, util 
 32  from Ganga.Utility.Config import getConfig 
 33   
 34  import inspect 
 35  import os 
 36  import re 
 37  import shutil 
 38  import signal 
 39  import time 
 40   
 41   
 42  logger = logging.getLogger() 
 43   
44 -class Interactive( IBackend ):
45 """Run jobs interactively on local host. 46 47 Interactive job prints output directly on screen and takes the input from the keyboard. 48 So it may be interupted with Ctrl-C 49 """ 50 51 _schema = Schema( Version( 1, 0 ), {\ 52 "id" : SimpleItem( defvalue = 0, protected=1, copyable = 0, 53 doc = "Process id" ), 54 "status" : SimpleItem( defvalue = "new", protected = 1, copyable = 0, 55 doc = "Backend status" ), 56 "exitcode" : SimpleItem( defvalue = 0, protected = 1, copyable = 0, 57 doc = "Process exit code" ), 58 "workdir" : SimpleItem( defvalue = "", protected = 1, copyable = 0, 59 doc = "Work directory" ), 60 "actualCE" : SimpleItem( defvalue = "", protected = 1, copyable = 0, 61 doc = "Name of machine where job is run" ) }) 62 63 _category = "backends" 64 _name = 'Interactive' 65
66 - def __init__( self ):
67 super( Interactive, self ).__init__()
68
69 - def _getIntFromOutfile\ 70 ( self, keyword = "", outfileName = "" ):
71 value = -999 72 job = self.getJobObject() 73 if keyword and outfileName and hasattr( job, "outputdir" ): 74 outfilePath = os.path.join( job.outputdir, outfileName ) 75 try: 76 statfile = open( outfilePath ) 77 statString = statfile.read() 78 testString = "".join( [ "^", keyword, " (?P<value>\\d*)" ] ) 79 regexp = re.compile( testString, re.M ) 80 match = regexp.search( statString ) 81 if match: 82 value = int( match.group( "value" ) ) 83 statfile.close() 84 except IOError: 85 pass 86 return value
87
88 - def submit( self, jobconfig, master_input_sandbox ):
89 """Submit job to backend (i.e. run job interactively). 90 91 Arguments other than self: 92 subjobconfig - Dictionary of subjob properties 93 master_input_sandbox - Dictionary of properties of master job 94 95 Return value: True always""" 96 97 job = self.getJobObject() 98 99 scriptpath = self.preparejob( jobconfig, master_input_sandbox ) 100 return self._submit(scriptpath)
101
102 - def resubmit( self ):
103 return self._submit(self.getJobObject().getInputWorkspace().getPath("__jobscript__"))
104
105 - def _submit( self, scriptpath):
106 job = self.getJobObject() 107 self.actualCE = util.hostname() 108 logger.info('Starting job %d', job.id) 109 110 try: 111 job.updateStatus( "submitted" ) 112 self.status = "submitted" 113 os.spawnv( os.P_WAIT, scriptpath, ( scriptpath, ) ) 114 self.status = "completed" 115 except KeyboardInterrupt: 116 self.status = "killed" 117 pass 118 119 return True
120 121
122 - def kill( self ):
123 124 """Method for killing job running on backend 125 126 No arguments other than self: 127 128 Return value: True always""" 129 130 job = self.getJobObject() 131 132 if not self.id: 133 time.sleep( 0.2 ) 134 self.id = self._getIntFromOutfile( "PID:", "__id__" ) 135 136 try: 137 os.kill( self.id, signal.SIGKILL ) 138 except OSError,x: 139 logger.warning( "Problem killing process %d for job %d: %s",\ 140 self.id, job.id, str( x ) ) 141 142 self.status = "killed" 143 self.remove_workdir() 144 145 return True
146
147 - def remove_workdir( self ):
148 149 """Method for removing job's work directory 150 151 No arguments other than self: 152 153 Return value: None""" 154 155 try: 156 shutil.rmtree( self.workdir ) 157 except OSError,x: 158 logger.warning( "Problem removing workdir %s: %s", self.workdir,\ 159 str( x ) ) 160 161 return None
162
163 - def preparejob( self, jobconfig, master_input_sandbox ):
164 165 """Method for preparing job script""" 166 167 job = self.getJobObject() 168 inbox = job.createPackedInputSandbox( jobconfig.getSandboxFiles() ) 169 inbox.extend( master_input_sandbox ) 170 inpDir = job.getInputWorkspace().getPath() 171 outDir = job.getOutputWorkspace().getPath() 172 workdir = tempfile.mkdtemp() 173 self.workdir = workdir 174 exeString = repr( jobconfig.getExeString() ) 175 argList = jobconfig.getArgStrings() 176 argString = " ".join( map( lambda x : "' \\'%s\\' '" % x, argList ) ) 177 178 commandList = [ 179 "#!/usr/bin/env python", 180 "# Interactive job wrapper created by Ganga", 181 "# %s" % ( time.strftime( "%c" ) ), 182 "", 183 inspect.getsource( Sandbox.WNSandbox ), 184 "import os", 185 "import sys", 186 "import time", 187 "", 188 "sys.path.insert( 0, '%s' )" % \ 189 getConfig( "System" )[ "GANGA_PYTHONPATH" ], 190 "", 191 "statfileName = os.path.join( '%s', '__jobstatus__' )" % outDir, 192 "try:", 193 " statfile = open( statfileName, 'w' )", 194 "except IOError, x:", 195 " print 'ERROR: Unable to write status file: %s' % statfileName", 196 " print 'ERROR: ',x", 197 " raise", 198 "", 199 "idfileName = os.path.join( '%s', '__id__' )" % outDir, 200 "try:", 201 " idfile = open( idfileName, 'w' )", 202 "except IOError, x:", 203 " print 'ERROR: Unable to write id file: %s' % idfileName", 204 " print 'ERROR: ',x", 205 " raise", 206 "idfile.close()", 207 "", 208 "timeString = time.strftime"\ 209 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )", 210 "statfile.write( 'START: ' + timeString + os.linesep )", 211 "", 212 "os.chdir( '%s' )" % workdir, 213 "for inFile in %s:" % inbox, 214 " getPackedInputSandbox( inFile )", 215 "", 216 "for key, value in %s.iteritems():" % jobconfig.env, 217 " os.environ[ key ] = value", 218 "", 219 "pyCommandList = [", 220 " 'import os',", 221 " 'idfileName = \"%s\"' % idfileName,", 222 " 'idfile = open( idfileName, \"a\" )',", 223 " 'idfile.write( \"PID: \" + str( os.getppid() ) )',", 224 " 'idfile.flush()',", 225 " 'idfile.close()' ]", 226 "pyCommandString = ';'.join( pyCommandList )", 227 "", 228 "commandList = [", 229 " 'python -c \\\'%s\\\'' % pyCommandString,", 230 " 'exec ' " + exeString + " " + argString + "]", 231 "commandString = ';'.join( commandList )", 232 "", 233 "result = os.system( '%s' % commandString )", 234 "", 235 "createOutputSandbox( %s, None, '%s' )" % \ 236 ( jobconfig.outputbox, outDir ), 237 "", 238 "statfile.write( 'EXITCODE: ' + str( result >> 8 ) + os.linesep )", 239 "timeString = time.strftime"\ 240 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )", 241 "statfile.write( 'STOP: ' + timeString + os.linesep )", 242 "statfile.flush()", 243 "statfile.close()" ] 244 245 commandString = "\n".join( commandList ) 246 return job.getInputWorkspace().writefile\ 247 ( FileBuffer( "__jobscript__", commandString), executable = 1 )
248
249 - def updateMonitoringInformation( jobs ):
250 251 for j in jobs: 252 253 if not j.backend.id: 254 id = j.backend._getIntFromOutfile( "PID:", "__id__" ) 255 if id > 0: 256 j.backend.id = id 257 if ( "submitted" == j.backend.status ): 258 j.backend.status = "running" 259 260 # Check that the process is still alive 261 if j.backend.id: 262 try: 263 os.kill( j.backend.id, 0 ) 264 except: 265 j.backend.status = "completed" 266 267 if j.backend.status in [ "completed", "failed", "killed" ]: 268 j.backend.exitcode = j.backend._getIntFromOutfile\ 269 ( "EXITCODE:", "__jobstatus__" ) 270 # Set job status to failed for non-zero exit code 271 if j.backend.exitcode: 272 if j.backend.exitcode in [ 2, 9, 256 ]: 273 j.backend.status = "killed" 274 else: 275 j.backend.status = "failed" 276 if ( j.backend.status != j.status ): 277 j.updateStatus( j.backend.status ) 278 279 return None
280 281 updateMonitoringInformation = staticmethod( updateMonitoringInformation )
282