1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 """Module containing class for handling job submission to Condor backend"""
58
59 __author__ = "K.Harrison <Harrison@hep.phy.cam.ac.uk>"
60 __date__ = "09 August 2009"
61 __version__ = "2.5"
62
63 from CondorRequirements import CondorRequirements
64
65 from Ganga.Core import Sandbox
66 from Ganga.GPIDev.Adapters.IBackend import IBackend
67 from Ganga.GPIDev.Lib.File.FileBuffer import FileBuffer
68 from Ganga.GPIDev.Schema import ComponentItem, Schema, SimpleItem, Version
69 from Ganga.Utility.ColourText import Foreground, Effects
70
71 import Ganga.Utility.Config
72 import Ganga.Utility.logging
73
74 import commands
75 import inspect
76 import os
77 import shutil
78 import time
79
80 logger = Ganga.Utility.logging.getLogger()
81
83 """Condor backend - submit jobs to a Condor pool.
84
85 For more options see help on CondorRequirements.
86 """
87
88 _schema = Schema( Version( 1, 0 ), {\
89 "requirements" : ComponentItem( category = "condor_requirements",
90 defvalue = "CondorRequirements",
91 doc = "Requirements for selecting execution host" ),
92 "env" : SimpleItem( defvalue = {},
93 doc = 'Environment settings for execution host' ),
94 "getenv" : SimpleItem( defvalue = "False",
95 doc = 'Flag to pass current envrionment to execution host' ),
96 "rank" : SimpleItem( defvalue = "Memory",
97 doc = "Ranking scheme to be used when selecting execution host" ),
98 "submit_options" : SimpleItem( defvalue = [], typelist = [ "str" ], \
99 sequence = 1, doc = "Options passed to Condor at submission time" ),
100 "id" : SimpleItem( defvalue = "", protected = 1, copyable = 0,
101 doc = "Condor jobid" ),
102 "status" : SimpleItem( defvalue = "", protected = 1, copyable = 0,
103 doc = "Condor status"),
104 "cputime" : SimpleItem( defvalue = "", protected = 1, copyable = 0,
105 doc = "CPU time used by job"),
106 "actualCE" : SimpleItem( defvalue = "", protected = 1, copyable = 0,
107 doc = "Machine where job has been submitted" ),
108 "shared_filesystem" : SimpleItem( defvalue = True,
109 doc = "Flag indicating if Condor nodes have shared filesystem" ),
110 "universe" : SimpleItem( defvalue = "vanilla",
111 doc = "Type of execution environment to be used by Condor" ),
112 "globusscheduler" : SimpleItem( defvalue = "", doc = \
113 "Globus scheduler to be used (required for Condor-G submission)" ),
114 "globus_rsl" : SimpleItem( defvalue = "",
115 doc = "Globus RSL settings (for Condor-G submission)" ),
116
117 } )
118
119 _category = "backends"
120 _name = "Condor"
121 statusDict = \
122 {
123 "0" : "Unexpanded",
124 "1" : "Idle",
125 "2" : "Running",
126 "3" : "Removed",
127 "4" : "Completed",
128 "5" : "Held"
129 }
130
133
134 - def submit( self, jobconfig, master_input_sandbox ):
135 """Submit job to backend.
136
137 Return value: True if job is submitted successfully,
138 or False otherwise"""
139
140 cdfpath = self.preparejob( jobconfig, master_input_sandbox )
141 status = self.submit_cdf( cdfpath )
142 return status
143
145 """Submit Condor Description File.
146
147 Argument other than self:
148 cdfpath - path to Condor Description File to be submitted
149
150 Return value: True if job is submitted successfully,
151 or False otherwise"""
152
153 commandList = [ "condor_submit -v" ]
154 commandList.extend( self.submit_options )
155 commandList.append( cdfpath )
156 commandString = " ".join( commandList )
157
158 status, output = commands.getstatusoutput( commandString )
159
160 self.id = ""
161 if 0 != status:
162 logger.error\
163 ( "Tried submitting job with command: '%s'" % commandString )
164 logger.error( "Return code: %s" % str( status ) )
165 logger.error( "Condor output:" )
166 logger.error( output )
167 else:
168 tmpList = output.split( "\n" )
169 for item in tmpList:
170 if 1 + item.find( "** Proc" ):
171 localId = item.strip( ":" ).split()[ 2 ]
172 queryCommand = " ".join\
173 ( [ "condor_q -format \"%s\" GlobalJobId", localId ] )
174 qstatus, qoutput = commands.getstatusoutput( queryCommand )
175 if 0 != status:
176 logger.warning\
177 ( "Problem determining global id for Condor job '%s'" % \
178 localId )
179 self.id = localId
180 else:
181 self.id = qoutput
182 break
183
184 return not self.id is ""
185
187 """Resubmit job that has already been configured.
188
189 Return value: True if job is resubmitted successfully,
190 or False otherwise"""
191
192 job = self.getJobObject()
193 inpDir = job.getInputWorkspace().getPath()
194 outDir = job.getOutputWorkspace().getPath()
195
196
197 if os.path.isdir( outDir ):
198 shutil.rmtree( outDir )
199 if os.path.exists( outDir ):
200 os.remove( outDir )
201 os.mkdir( outDir )
202
203
204 cdfpath = os.path.join( inpDir, "__cdf__" )
205
206
207 if os.path.exists( cdfpath ):
208 status = self.submit_cdf( cdfpath )
209 else:
210 logger.warning\
211 ( "No Condor Description File for job '%s' found in '%s'" % \
212 ( str( job.id ), inpDir ) )
213 logger.warning( "Resubmission failed" )
214 status = False
215
216 return status
217
219 """Kill running job.
220
221 No arguments other than self
222
223 Return value: True if job killed successfully,
224 or False otherwise"""
225
226 job = self.getJobObject()
227
228 if not self.id:
229 logger.warning( "Job %s not running" % job.id )
230 return False
231
232 idElementList = job.backend.id.split( "#" )
233 if 3 == len( idElementList ):
234 killCommand = "condor_rm -name %s %s" % \
235 ( idElementList[ 0 ], idElementList[ 2 ] )
236 else:
237 killCommand = "condor_rm %s" % ( idElementList[ 0 ] )
238
239 status, output = commands.getstatusoutput( killCommand )
240
241 if ( status != 0 ):
242 logger.warning\
243 ( "Return code '%s' killing job '%s' - Condor id '%s'" % \
244 ( str( status ), job.id, job.backend.id ) )
245 logger.warning( "Tried command: '%s'" % killCommand )
246 logger.warning( "Command output: '%s'" % output )
247 logger.warning( "Anyway continuing with job removal" )
248
249 job.backend.status = "Removed"
250 killStatus = True
251
252 return killStatus
253
254 - def preparejob( self, jobconfig, master_input_sandbox ):
255 """Prepare Condor description file"""
256
257 job = self.getJobObject()
258 inbox = job.createPackedInputSandbox( jobconfig.getSandboxFiles() )
259 inpDir = job.getInputWorkspace().getPath()
260 outDir = job.getOutputWorkspace().getPath()
261
262 infileList = []
263
264 exeString = jobconfig.getExeString().strip()
265 quotedArgList = []
266 for arg in jobconfig.getArgStrings():
267 quotedArgList.append( "\\'%s\\'" % arg )
268 exeCmdString = " ".join( [ exeString ] + quotedArgList )
269
270 for filePath in inbox:
271 if not filePath in infileList:
272 infileList.append( filePath )
273
274 for filePath in master_input_sandbox:
275 if not filePath in infileList:
276 infileList.append( filePath )
277
278 fileList = []
279 for filePath in infileList:
280 fileList.append( os.path.basename( filePath ) )
281
282 if job.name:
283 name = job.name
284 else:
285 name = job.application._name
286 name = "_".join( name.split() )
287 wrapperName = "_".join( [ "Ganga", str( job.id ), name ] )
288
289 commandList = [
290 "#!/usr/bin/env python",
291 "# Condor job wrapper created by Ganga",
292 "# %s" % ( time.strftime( "%c" ) ),
293 "",
294 inspect.getsource( Sandbox.WNSandbox ),
295 "",
296 "import os",
297 "import time",
298 "",
299 "startTime = time.strftime"\
300 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )",
301 "",
302 "for inFile in %s:" % str( fileList ),
303 " getPackedInputSandbox( inFile )",
304 "",
305 "exePath = '%s'" % exeString,
306 "if os.path.isfile( '%s' ):" % os.path.basename( exeString ),
307 " os.chmod( '%s', 0755 )" % os.path.basename( exeString ),
308 "wrapperName = '%s_bash_wrapper.sh'" % wrapperName,
309 "wrapperFile = open( wrapperName, 'w' )",
310 "wrapperFile.write( '#!/bin/bash\\n' )",
311 "wrapperFile.write( 'echo \"\"\\n' )",
312 "wrapperFile.write( 'echo \"Hostname: $(hostname -f)\"\\n' )",
313 "wrapperFile.write( 'echo \"\\${BASH_ENV}: ${BASH_ENV}\"\\n' )",
314 "wrapperFile.write( 'if ! [ -z \"${BASH_ENV}\" ]; then\\n' )",
315 "wrapperFile.write( ' if ! [ -f \"${BASH_ENV}\" ]; then\\n' )",
316 "wrapperFile.write( ' echo \"*** Warning: "\
317 + "\\${BASH_ENV} file not found ***\"\\n' )",
318 "wrapperFile.write( ' fi\\n' )",
319 "wrapperFile.write( 'fi\\n' )",
320 "wrapperFile.write( 'echo \"\"\\n' )",
321 "wrapperFile.write( '%s\\n' )" % exeCmdString,
322 "wrapperFile.write( 'exit ${?}\\n' )",
323 "wrapperFile.close()",
324 "os.chmod( wrapperName, 0755 )",
325 "result = os.system( './%s' % wrapperName )",
326 "os.remove( wrapperName )",
327 "",
328 "endTime = time.strftime"\
329 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )",
330 "print '\\nJob start: ' + startTime",
331 "print 'Job end: ' + endTime",
332 "print 'Exit code: %s' % str( result )"
333 ]
334
335 commandString = "\n".join( commandList )
336 wrapper = job.getInputWorkspace().writefile\
337 ( FileBuffer( wrapperName, commandString), executable = 1 )
338
339 infileString = ",".join( infileList )
340 outfileString = ",".join( jobconfig.outputbox )
341
342 cdfDict = \
343 {
344 'universe' : self.universe,
345 'on_exit_remove' : 'True',
346 'should_transfer_files' : 'YES',
347 'when_to_transfer_output' : 'ON_EXIT_OR_EVICT',
348 'executable' : wrapper,
349 'transfer_executable' : 'True',
350 'notification' : 'Never',
351 'rank' : self.rank,
352 'initialdir' : outDir,
353 'error' : 'stderr',
354 'output' : 'stdout',
355 'log' : 'condorLog',
356 'stream_output' : 'false',
357 'stream_error' : 'false',
358 'getenv' : self.getenv
359 }
360
361 envList = []
362 if self.env:
363 for key in self.env.keys():
364 value = self.env[ key ]
365 if ( type( value ) == type( "" ) ):
366 value = os.path.expandvars( value )
367 else:
368 value = str( value )
369 envList.append( "=".join( [ key, value ] ) )
370 envString = ";".join( envList )
371 if jobconfig.env:
372 for key in jobconfig.env.keys():
373 value = jobconfig.env[ key ]
374 if ( type( value ) == type( "" ) ):
375 value = os.path.expandvars( value )
376 else:
377 value = str( value )
378 envList.append( "=".join( [ key, value ] ) )
379 envString = ";".join( envList )
380 if envString:
381 cdfDict[ 'environment' ] = envString
382
383 if infileString:
384 cdfDict[ 'transfer_input_files' ] = infileString
385
386 if self.globusscheduler:
387 cdfDict[ 'globusscheduler' ] = self.globusscheduler
388
389 if self.globus_rsl:
390 cdfDict[ 'globus_rsl' ] = self.globus_rsl
391
392
393
394
395 cdfList = [
396 "# Condor Description File created by Ganga",
397 "# %s" % ( time.strftime( "%c" ) ),
398 "" ]
399 for key, value in cdfDict.iteritems():
400 cdfList.append( "%s = %s" % ( key, value ) )
401 cdfList.append( self.requirements.convert() )
402 cdfList.append( "queue" )
403 cdfString = "\n".join( cdfList )
404
405 return job.getInputWorkspace().writefile\
406 ( FileBuffer( "__cdf__", cdfString) )
407
554
555 updateMonitoringInformation = \
556 staticmethod( updateMonitoringInformation )
557