1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """Module containing class for running jobs interactively"""
22
23 __author__ = "K.Harrison <Harrison@hep.phy.cam.ac.uk>"
24 __date__ = "6 February 2008"
25 __version__ = "1.4"
26
27 from Ganga.Core import Sandbox
28 from Ganga.GPIDev.Adapters.IBackend import IBackend
29 from Ganga.GPIDev.Lib.File import FileBuffer
30 from Ganga.GPIDev.Schema import ComponentItem, Schema, SimpleItem, Version
31 from Ganga.Utility import logging, tempfile, util
32 from Ganga.Utility.Config import getConfig
33
34 import inspect
35 import os
36 import re
37 import shutil
38 import signal
39 import time
40
41
42 logger = logging.getLogger()
43
45 """Run jobs interactively on local host.
46
47 Interactive job prints output directly on screen and takes the input from the keyboard.
48 So it may be interupted with Ctrl-C
49 """
50
51 _schema = Schema( Version( 1, 0 ), {\
52 "id" : SimpleItem( defvalue = 0, protected=1, copyable = 0,
53 doc = "Process id" ),
54 "status" : SimpleItem( defvalue = "new", protected = 1, copyable = 0,
55 doc = "Backend status" ),
56 "exitcode" : SimpleItem( defvalue = 0, protected = 1, copyable = 0,
57 doc = "Process exit code" ),
58 "workdir" : SimpleItem( defvalue = "", protected = 1, copyable = 0,
59 doc = "Work directory" ),
60 "actualCE" : SimpleItem( defvalue = "", protected = 1, copyable = 0,
61 doc = "Name of machine where job is run" ) })
62
63 _category = "backends"
64 _name = 'Interactive'
65
68
71 value = -999
72 job = self.getJobObject()
73 if keyword and outfileName and hasattr( job, "outputdir" ):
74 outfilePath = os.path.join( job.outputdir, outfileName )
75 try:
76 statfile = open( outfilePath )
77 statString = statfile.read()
78 testString = "".join( [ "^", keyword, " (?P<value>\\d*)" ] )
79 regexp = re.compile( testString, re.M )
80 match = regexp.search( statString )
81 if match:
82 value = int( match.group( "value" ) )
83 statfile.close()
84 except IOError:
85 pass
86 return value
87
88 - def submit( self, jobconfig, master_input_sandbox ):
89 """Submit job to backend (i.e. run job interactively).
90
91 Arguments other than self:
92 subjobconfig - Dictionary of subjob properties
93 master_input_sandbox - Dictionary of properties of master job
94
95 Return value: True always"""
96
97 job = self.getJobObject()
98
99 scriptpath = self.preparejob( jobconfig, master_input_sandbox )
100 return self._submit(scriptpath)
101
104
120
121
123
124 """Method for killing job running on backend
125
126 No arguments other than self:
127
128 Return value: True always"""
129
130 job = self.getJobObject()
131
132 if not self.id:
133 time.sleep( 0.2 )
134 self.id = self._getIntFromOutfile( "PID:", "__id__" )
135
136 try:
137 os.kill( self.id, signal.SIGKILL )
138 except OSError,x:
139 logger.warning( "Problem killing process %d for job %d: %s",\
140 self.id, job.id, str( x ) )
141
142 self.status = "killed"
143 self.remove_workdir()
144
145 return True
146
148
149 """Method for removing job's work directory
150
151 No arguments other than self:
152
153 Return value: None"""
154
155 try:
156 shutil.rmtree( self.workdir )
157 except OSError,x:
158 logger.warning( "Problem removing workdir %s: %s", self.workdir,\
159 str( x ) )
160
161 return None
162
163 - def preparejob( self, jobconfig, master_input_sandbox ):
164
165 """Method for preparing job script"""
166
167 job = self.getJobObject()
168 inbox = job.createPackedInputSandbox( jobconfig.getSandboxFiles() )
169 inbox.extend( master_input_sandbox )
170 inpDir = job.getInputWorkspace().getPath()
171 outDir = job.getOutputWorkspace().getPath()
172 workdir = tempfile.mkdtemp()
173 self.workdir = workdir
174 exeString = repr( jobconfig.getExeString() )
175 argList = jobconfig.getArgStrings()
176 argString = " ".join( map( lambda x : "' \\'%s\\' '" % x, argList ) )
177
178 commandList = [
179 "#!/usr/bin/env python",
180 "# Interactive job wrapper created by Ganga",
181 "# %s" % ( time.strftime( "%c" ) ),
182 "",
183 inspect.getsource( Sandbox.WNSandbox ),
184 "import os",
185 "import sys",
186 "import time",
187 "",
188 "sys.path.insert( 0, '%s' )" % \
189 getConfig( "System" )[ "GANGA_PYTHONPATH" ],
190 "",
191 "statfileName = os.path.join( '%s', '__jobstatus__' )" % outDir,
192 "try:",
193 " statfile = open( statfileName, 'w' )",
194 "except IOError, x:",
195 " print 'ERROR: Unable to write status file: %s' % statfileName",
196 " print 'ERROR: ',x",
197 " raise",
198 "",
199 "idfileName = os.path.join( '%s', '__id__' )" % outDir,
200 "try:",
201 " idfile = open( idfileName, 'w' )",
202 "except IOError, x:",
203 " print 'ERROR: Unable to write id file: %s' % idfileName",
204 " print 'ERROR: ',x",
205 " raise",
206 "idfile.close()",
207 "",
208 "timeString = time.strftime"\
209 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )",
210 "statfile.write( 'START: ' + timeString + os.linesep )",
211 "",
212 "os.chdir( '%s' )" % workdir,
213 "for inFile in %s:" % inbox,
214 " getPackedInputSandbox( inFile )",
215 "",
216 "for key, value in %s.iteritems():" % jobconfig.env,
217 " os.environ[ key ] = value",
218 "",
219 "pyCommandList = [",
220 " 'import os',",
221 " 'idfileName = \"%s\"' % idfileName,",
222 " 'idfile = open( idfileName, \"a\" )',",
223 " 'idfile.write( \"PID: \" + str( os.getppid() ) )',",
224 " 'idfile.flush()',",
225 " 'idfile.close()' ]",
226 "pyCommandString = ';'.join( pyCommandList )",
227 "",
228 "commandList = [",
229 " 'python -c \\\'%s\\\'' % pyCommandString,",
230 " 'exec ' " + exeString + " " + argString + "]",
231 "commandString = ';'.join( commandList )",
232 "",
233 "result = os.system( '%s' % commandString )",
234 "",
235 "createOutputSandbox( %s, None, '%s' )" % \
236 ( jobconfig.outputbox, outDir ),
237 "",
238 "statfile.write( 'EXITCODE: ' + str( result >> 8 ) + os.linesep )",
239 "timeString = time.strftime"\
240 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )",
241 "statfile.write( 'STOP: ' + timeString + os.linesep )",
242 "statfile.flush()",
243 "statfile.close()" ]
244
245 commandString = "\n".join( commandList )
246 return job.getInputWorkspace().writefile\
247 ( FileBuffer( "__jobscript__", commandString), executable = 1 )
248
280
281 updateMonitoringInformation = staticmethod( updateMonitoringInformation )
282