Package Ganga :: Package Lib :: Package Condor :: Module Condor'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Lib.Condor.Condor'

  1  ############################################################################### 
  2  # Ganga Project. http://cern.ch/ganga 
  3  # 
  4  # $Id: Condor.py,v 1.8 2009/04/02 17:52:24 karl Exp $ 
  5  ############################################################################### 
  6  # File: Condor.py 
  7  # Author: K. Harrison 
  8  # Created: 051228 
  9  # 
 10  # KH - 060321: Additions from Johannes Elmsheuser, to allow use with Condor-G 
 11  # 
 12  # KH - 060728: Changes for framework migration 
 13  # 
 14  # KH - 061012: Updates in submit and preparejob methods, for core changes 
 15  # 
 16  # KH - 061027: Corrections for case of no shared filesystem 
 17  # 
 18  # KH - 080213: Changes made to use global Condor id for tracking job status 
 19  # 
 20  # KH - 080215: Correction for passing quoted strings as a command-line argument 
 21  #              to an executable 
 22  # 
 23  # KH - 080410: Corrections made to kill() method of Condor class 
 24  # 
 25  # KH - 080410: Implemented resubmit() method of Condor class 
 26  # 
 27  # KH - 080729: Updates for changes to JobConfig class in Ganga 5 
 28  #              Error message printed in case submit command fails 
 29  # 
 30  # KH - 081008 : Added typelist information for schema property "submit_options" 
 31  # 
 32  # KH - 081102 : Remove spurious print statement 
 33  # 
 34  # KH - 090128 : Added getenv property to Condor class, to allow environment of 
 35  #               submit machine to be passed to worker node 
 36  # 
 37  #               Added creation of bash job wrapper, to allow environment 
 38  #               setup by defining BASH_ENV to point to setup script 
 39  # 
 40  #               Set status to failed for job with non-zero exit code 
 41  # 
 42  # KH - 090307 : Modified kill() method to assume success even in case of 
 43  #               non-zero return code from condor_rm  
 44  # 
 45  # KH - 090308 : Modified updateMonitoringInformation() method 
 46  #               to deal with case where all queues are empty 
 47  # 
 48  # KH - 090402 : Corrected bug that meant application arguments were ignored 
 49  # 
 50  #               In script to be run on worker node, print warning if unable 
 51  #               to find startup script pointed to by BASH_ENV 
 52  # 
 53  # KH - 090809 : Changed logic for updating job status to final value - 
 54  #               Condor log file is now searched to check whether job 
 55  #               is marked as "aborted" or "terminated" 
 56   
 57  """Module containing class for handling job submission to Condor backend""" 
 58   
 59  __author__  = "K.Harrison <Harrison@hep.phy.cam.ac.uk>" 
 60  __date__    = "09 August 2009" 
 61  __version__ = "2.5" 
 62   
 63  from CondorRequirements import CondorRequirements 
 64   
 65  from Ganga.Core import Sandbox 
 66  from Ganga.GPIDev.Adapters.IBackend import IBackend 
 67  from Ganga.GPIDev.Lib.File.FileBuffer import FileBuffer 
 68  from Ganga.GPIDev.Schema import ComponentItem, Schema, SimpleItem, Version 
 69  from Ganga.Utility.ColourText import Foreground, Effects 
 70   
 71  import Ganga.Utility.Config  
 72  import Ganga.Utility.logging 
 73   
 74  import commands 
 75  import inspect 
 76  import os 
 77  import shutil 
 78  import time 
 79   
 80  logger = Ganga.Utility.logging.getLogger() 
 81   
82 -class Condor( IBackend ):
83 """Condor backend - submit jobs to a Condor pool. 84 85 For more options see help on CondorRequirements. 86 """ 87 88 _schema = Schema( Version( 1, 0 ), {\ 89 "requirements" : ComponentItem( category = "condor_requirements", 90 defvalue = "CondorRequirements", 91 doc = "Requirements for selecting execution host" ), 92 "env" : SimpleItem( defvalue = {}, 93 doc = 'Environment settings for execution host' ), 94 "getenv" : SimpleItem( defvalue = "False", 95 doc = 'Flag to pass current envrionment to execution host' ), 96 "rank" : SimpleItem( defvalue = "Memory", 97 doc = "Ranking scheme to be used when selecting execution host" ), 98 "submit_options" : SimpleItem( defvalue = [], typelist = [ "str" ], \ 99 sequence = 1, doc = "Options passed to Condor at submission time" ), 100 "id" : SimpleItem( defvalue = "", protected = 1, copyable = 0, 101 doc = "Condor jobid" ), 102 "status" : SimpleItem( defvalue = "", protected = 1, copyable = 0, 103 doc = "Condor status"), 104 "cputime" : SimpleItem( defvalue = "", protected = 1, copyable = 0, 105 doc = "CPU time used by job"), 106 "actualCE" : SimpleItem( defvalue = "", protected = 1, copyable = 0, 107 doc = "Machine where job has been submitted" ), 108 "shared_filesystem" : SimpleItem( defvalue = True, 109 doc = "Flag indicating if Condor nodes have shared filesystem" ), 110 "universe" : SimpleItem( defvalue = "vanilla", 111 doc = "Type of execution environment to be used by Condor" ), 112 "globusscheduler" : SimpleItem( defvalue = "", doc = \ 113 "Globus scheduler to be used (required for Condor-G submission)" ), 114 "globus_rsl" : SimpleItem( defvalue = "", 115 doc = "Globus RSL settings (for Condor-G submission)" ), 116 117 } ) 118 119 _category = "backends" 120 _name = "Condor" 121 statusDict = \ 122 { 123 "0" : "Unexpanded", 124 "1" : "Idle", 125 "2" : "Running", 126 "3" : "Removed", 127 "4" : "Completed", 128 "5" : "Held" 129 } 130
131 - def __init__( self ):
132 super( Condor, self ).__init__()
133
134 - def submit( self, jobconfig, master_input_sandbox ):
135 """Submit job to backend. 136 137 Return value: True if job is submitted successfully, 138 or False otherwise""" 139 140 cdfpath = self.preparejob( jobconfig, master_input_sandbox ) 141 status = self.submit_cdf( cdfpath ) 142 return status
143
144 - def submit_cdf( self, cdfpath = "" ):
145 """Submit Condor Description File. 146 147 Argument other than self: 148 cdfpath - path to Condor Description File to be submitted 149 150 Return value: True if job is submitted successfully, 151 or False otherwise""" 152 153 commandList = [ "condor_submit -v" ] 154 commandList.extend( self.submit_options ) 155 commandList.append( cdfpath ) 156 commandString = " ".join( commandList ) 157 158 status, output = commands.getstatusoutput( commandString ) 159 160 self.id = "" 161 if 0 != status: 162 logger.error\ 163 ( "Tried submitting job with command: '%s'" % commandString ) 164 logger.error( "Return code: %s" % str( status ) ) 165 logger.error( "Condor output:" ) 166 logger.error( output ) 167 else: 168 tmpList = output.split( "\n" ) 169 for item in tmpList: 170 if 1 + item.find( "** Proc" ): 171 localId = item.strip( ":" ).split()[ 2 ] 172 queryCommand = " ".join\ 173 ( [ "condor_q -format \"%s\" GlobalJobId", localId ] ) 174 qstatus, qoutput = commands.getstatusoutput( queryCommand ) 175 if 0 != status: 176 logger.warning\ 177 ( "Problem determining global id for Condor job '%s'" % \ 178 localId ) 179 self.id = localId 180 else: 181 self.id = qoutput 182 break 183 184 return not self.id is ""
185
186 - def resubmit( self ):
187 """Resubmit job that has already been configured. 188 189 Return value: True if job is resubmitted successfully, 190 or False otherwise""" 191 192 job = self.getJobObject() 193 inpDir = job.getInputWorkspace().getPath() 194 outDir = job.getOutputWorkspace().getPath() 195 196 # Delete any existing output files, and recreate output directory 197 if os.path.isdir( outDir ): 198 shutil.rmtree( outDir ) 199 if os.path.exists( outDir ): 200 os.remove( outDir ) 201 os.mkdir( outDir ) 202 203 # Determine path to job's Condor Description File 204 cdfpath = os.path.join( inpDir, "__cdf__" ) 205 206 # Resubmit job 207 if os.path.exists( cdfpath ): 208 status = self.submit_cdf( cdfpath ) 209 else: 210 logger.warning\ 211 ( "No Condor Description File for job '%s' found in '%s'" % \ 212 ( str( job.id ), inpDir ) ) 213 logger.warning( "Resubmission failed" ) 214 status = False 215 216 return status
217
218 - def kill( self ):
219 """Kill running job. 220 221 No arguments other than self 222 223 Return value: True if job killed successfully, 224 or False otherwise""" 225 226 job = self.getJobObject() 227 228 if not self.id: 229 logger.warning( "Job %s not running" % job.id ) 230 return False 231 232 idElementList = job.backend.id.split( "#" ) 233 if 3 == len( idElementList ): 234 killCommand = "condor_rm -name %s %s" % \ 235 ( idElementList[ 0 ], idElementList[ 2 ] ) 236 else: 237 killCommand = "condor_rm %s" % ( idElementList[ 0 ] ) 238 239 status, output = commands.getstatusoutput( killCommand ) 240 241 if ( status != 0 ): 242 logger.warning\ 243 ( "Return code '%s' killing job '%s' - Condor id '%s'" % \ 244 ( str( status ), job.id, job.backend.id ) ) 245 logger.warning( "Tried command: '%s'" % killCommand ) 246 logger.warning( "Command output: '%s'" % output ) 247 logger.warning( "Anyway continuing with job removal" ) 248 249 job.backend.status = "Removed" 250 killStatus = True 251 252 return killStatus
253
254 - def preparejob( self, jobconfig, master_input_sandbox ):
255 """Prepare Condor description file""" 256 257 job = self.getJobObject() 258 inbox = job.createPackedInputSandbox( jobconfig.getSandboxFiles() ) 259 inpDir = job.getInputWorkspace().getPath() 260 outDir = job.getOutputWorkspace().getPath() 261 262 infileList = [] 263 264 exeString = jobconfig.getExeString().strip() 265 quotedArgList = [] 266 for arg in jobconfig.getArgStrings(): 267 quotedArgList.append( "\\'%s\\'" % arg ) 268 exeCmdString = " ".join( [ exeString ] + quotedArgList ) 269 270 for filePath in inbox: 271 if not filePath in infileList: 272 infileList.append( filePath ) 273 274 for filePath in master_input_sandbox: 275 if not filePath in infileList: 276 infileList.append( filePath ) 277 278 fileList = [] 279 for filePath in infileList: 280 fileList.append( os.path.basename( filePath ) ) 281 282 if job.name: 283 name = job.name 284 else: 285 name = job.application._name 286 name = "_".join( name.split() ) 287 wrapperName = "_".join( [ "Ganga", str( job.id ), name ] ) 288 289 commandList = [ 290 "#!/usr/bin/env python", 291 "# Condor job wrapper created by Ganga", 292 "# %s" % ( time.strftime( "%c" ) ), 293 "", 294 inspect.getsource( Sandbox.WNSandbox ), 295 "", 296 "import os", 297 "import time", 298 "", 299 "startTime = time.strftime"\ 300 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )", 301 "", 302 "for inFile in %s:" % str( fileList ), 303 " getPackedInputSandbox( inFile )", 304 "", 305 "exePath = '%s'" % exeString, 306 "if os.path.isfile( '%s' ):" % os.path.basename( exeString ), 307 " os.chmod( '%s', 0755 )" % os.path.basename( exeString ), 308 "wrapperName = '%s_bash_wrapper.sh'" % wrapperName, 309 "wrapperFile = open( wrapperName, 'w' )", 310 "wrapperFile.write( '#!/bin/bash\\n' )", 311 "wrapperFile.write( 'echo \"\"\\n' )", 312 "wrapperFile.write( 'echo \"Hostname: $(hostname -f)\"\\n' )", 313 "wrapperFile.write( 'echo \"\\${BASH_ENV}: ${BASH_ENV}\"\\n' )", 314 "wrapperFile.write( 'if ! [ -z \"${BASH_ENV}\" ]; then\\n' )", 315 "wrapperFile.write( ' if ! [ -f \"${BASH_ENV}\" ]; then\\n' )", 316 "wrapperFile.write( ' echo \"*** Warning: "\ 317 + "\\${BASH_ENV} file not found ***\"\\n' )", 318 "wrapperFile.write( ' fi\\n' )", 319 "wrapperFile.write( 'fi\\n' )", 320 "wrapperFile.write( 'echo \"\"\\n' )", 321 "wrapperFile.write( '%s\\n' )" % exeCmdString, 322 "wrapperFile.write( 'exit ${?}\\n' )", 323 "wrapperFile.close()", 324 "os.chmod( wrapperName, 0755 )", 325 "result = os.system( './%s' % wrapperName )", 326 "os.remove( wrapperName )", 327 "", 328 "endTime = time.strftime"\ 329 + "( '%a %d %b %H:%M:%S %Y', time.gmtime( time.time() ) )", 330 "print '\\nJob start: ' + startTime", 331 "print 'Job end: ' + endTime", 332 "print 'Exit code: %s' % str( result )" 333 ] 334 335 commandString = "\n".join( commandList ) 336 wrapper = job.getInputWorkspace().writefile\ 337 ( FileBuffer( wrapperName, commandString), executable = 1 ) 338 339 infileString = ",".join( infileList ) 340 outfileString = ",".join( jobconfig.outputbox ) 341 342 cdfDict = \ 343 { 344 'universe' : self.universe, 345 'on_exit_remove' : 'True', 346 'should_transfer_files' : 'YES', 347 'when_to_transfer_output' : 'ON_EXIT_OR_EVICT', 348 'executable' : wrapper, 349 'transfer_executable' : 'True', 350 'notification' : 'Never', 351 'rank' : self.rank, 352 'initialdir' : outDir, 353 'error' : 'stderr', 354 'output' : 'stdout', 355 'log' : 'condorLog', 356 'stream_output' : 'false', 357 'stream_error' : 'false', 358 'getenv' : self.getenv 359 } 360 361 envList = [] 362 if self.env: 363 for key in self.env.keys(): 364 value = self.env[ key ] 365 if ( type( value ) == type( "" ) ): 366 value = os.path.expandvars( value ) 367 else: 368 value = str( value ) 369 envList.append( "=".join( [ key, value ] ) ) 370 envString = ";".join( envList ) 371 if jobconfig.env: 372 for key in jobconfig.env.keys(): 373 value = jobconfig.env[ key ] 374 if ( type( value ) == type( "" ) ): 375 value = os.path.expandvars( value ) 376 else: 377 value = str( value ) 378 envList.append( "=".join( [ key, value ] ) ) 379 envString = ";".join( envList ) 380 if envString: 381 cdfDict[ 'environment' ] = envString 382 383 if infileString: 384 cdfDict[ 'transfer_input_files' ] = infileString 385 386 if self.globusscheduler: 387 cdfDict[ 'globusscheduler' ] = self.globusscheduler 388 389 if self.globus_rsl: 390 cdfDict[ 'globus_rsl' ] = self.globus_rsl 391 392 # if outfileString: 393 # cdfDict[ 'transfer_output_files' ] = outfileString 394 395 cdfList = [ 396 "# Condor Description File created by Ganga", 397 "# %s" % ( time.strftime( "%c" ) ), 398 "" ] 399 for key, value in cdfDict.iteritems(): 400 cdfList.append( "%s = %s" % ( key, value ) ) 401 cdfList.append( self.requirements.convert() ) 402 cdfList.append( "queue" ) 403 cdfString = "\n".join( cdfList ) 404 405 return job.getInputWorkspace().writefile\ 406 ( FileBuffer( "__cdf__", cdfString) )
407
408 - def updateMonitoringInformation( jobs ):
409 410 jobDict = {} 411 for job in jobs: 412 if job.backend.id: 413 jobDict[ job.backend.id ] = job 414 415 idList = jobDict.keys() 416 417 if not idList: 418 return 419 420 queryCommand = " ".join\ 421 ( [ 422 "condor_q -global", 423 "-format \"%s \" GlobalJobId", 424 "-format \"%s \" RemoteHost", 425 "-format \"%d \" JobStatus", 426 "-format \"%f\\n\" RemoteUserCpu" 427 ] ) 428 status, output = commands.getstatusoutput( queryCommand ) 429 if 0 != status: 430 logger.error( "Problem retrieving status for Condor jobs" ) 431 return 432 433 if ( "All queues are empty" == output ): 434 infoList = [] 435 else: 436 infoList = output.split( "\n" ) 437 438 allDict = {} 439 for infoString in infoList: 440 tmpList = infoString.split() 441 id, host, status, cputime = ( "", "", "", "" ) 442 if 3 == len( tmpList ): 443 id, status, cputime = tmpList 444 if 4 == len( tmpList ): 445 id, host, status, cputime = tmpList 446 if id: 447 allDict[ id ] = {} 448 allDict[ id ][ "status" ] = Condor.statusDict[ status ] 449 allDict[ id ][ "cputime" ] = cputime 450 allDict[ id ][ "host" ] = host 451 452 fg = Foreground() 453 fx = Effects() 454 status_colours = { 'submitted' : fg.orange, 455 'running' : fg.green, 456 'completed' : fg.blue } 457 458 for id in idList: 459 460 printStatus = False 461 if jobDict[ id ].status == "killed": 462 continue 463 464 localId = id.split( "#" )[ -1 ] 465 globalId = id 466 467 if globalId == localId: 468 queryCommand = " ".join\ 469 ( [ 470 "condor_q -global", 471 "-format \"%s\" GlobalJobId", 472 id 473 ] ) 474 status, output = commands.getstatusoutput( queryCommand ) 475 if 0 == status: 476 globalId = output 477 478 if globalId in allDict.keys(): 479 status = allDict[ globalId ][ "status" ] 480 host = allDict[ globalId ][ "host" ] 481 cputime = allDict[ globalId ][ "cputime" ] 482 if status != jobDict[ id ].backend.status: 483 printStatus = True 484 jobDict[ id ].backend.status = status 485 if jobDict[ id ].backend.status == "Running": 486 jobDict[ id ].updateStatus( "running" ) 487 488 if host: 489 if jobDict[ id ].backend.actualCE != host: 490 jobDict[ id ].backend.actualCE = host 491 jobDict[ id ].backend.cputime = cputime 492 else: 493 jobDict[ id ].backend.status = "" 494 outDir = jobDict[ id ].getOutputWorkspace().getPath() 495 condorLogPath = "".join( [ outDir, "condorLog" ] ) 496 checkExit = True 497 if os.path.isfile( condorLogPath ): 498 checkExit = False 499 condorLog = open( condorLogPath ) 500 lineList = condorLog.readlines() 501 condorLog.close() 502 for line in lineList: 503 if -1 != line.find( "terminated" ): 504 checkExit = True 505 break 506 if -1 != line.find( "aborted" ): 507 checkExit = True 508 break 509 510 if checkExit: 511 printStatus = True 512 stdoutPath = "".join( [ outDir, "stdout" ] ) 513 jobStatus = "failed" 514 if os.path.isfile( stdoutPath ): 515 stdout = open( stdoutPath ) 516 lineList = stdout.readlines() 517 stdout.close() 518 try: 519 exitLine = lineList[ -1 ] 520 exitCode = exitLine.strip().split()[ -1 ] 521 except IndexError: 522 exitCode = -1 523 if 0 == int( exitCode ): 524 jobStatus = "completed" 525 526 jobDict[ id ].updateStatus( jobStatus ) 527 528 if printStatus: 529 if jobDict[ id ].backend.actualCE: 530 hostInfo = jobDict[ id ].backend.actualCE 531 else: 532 hostInfo = "Condor" 533 status = jobDict[ id ].status 534 if status_colours.has_key( status ): 535 colour = status_colours[ status ] 536 else: 537 colour = fg.magenta 538 if "submitted" == status: 539 preposition = "to" 540 else: 541 preposition = "on" 542 543 if jobDict[ id ].backend.status: 544 backendStatus = "".join\ 545 ( [ " (", jobDict[ id ].backend.status, ") " ] ) 546 else: 547 backendStatus = "" 548 549 logger.info( colour + 'Job %d %s%s %s %s - %s' + fx.normal,\ 550 jobDict[ id ].id, status, backendStatus, preposition, hostInfo,\ 551 time.strftime( '%c' ) ) 552 553 return None
554 555 updateMonitoringInformation = \ 556 staticmethod( updateMonitoringInformation )
557