Package Ganga :: Package Core :: Package MonitoringComponent :: Module Local_GangaMC_Service
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.MonitoringComponent.Local_GangaMC_Service

  1  import Queue, threading, time 
  2   
  3  from Ganga.Core.GangaThread import GangaThread 
  4  from Ganga.Core.GangaRepository import RegistryKeyError, RegistryLockError 
  5   
  6  import sys 
  7  if sys.hexversion >= 0x020600F0: 
  8      Set = set 
  9  else: 
 10      from sets import Set 
 11   
 12  from Ganga.Utility.threads import SynchronisedObject 
 13  from Ganga.Utility.util import IList 
 14   
 15  import Ganga.GPIDev.Credentials as Credentials 
 16  from Ganga.Core.InternalServices import Coordinator  
 17   
 18  # Setup logging --------------- 
 19  import Ganga.Utility.logging 
 20  log = Ganga.Utility.logging.getLogger() 
 21   
 22  from Ganga.Core import BackendError 
 23  import Ganga.GPIDev.Adapters.IBackend 
 24  import Ganga.Utility.Config 
 25  config = Ganga.Utility.Config.makeConfig( 'PollThread', 'background job status monitoring and output retrieval' ) 
 26   
 27  from Ganga.Core import GangaException 
 28   
 29  # some defaults 
 30  config.addOption( 'repeat_messages',False,'if 0 then log only once the errors for a given backend and do not repeat them anymore') 
 31  config.addOption( 'autostart',None,'enable monitoring automatically at startup, in script mode monitoring is disabled by default, in interactive mode it is enabled', type=type(True)) # enable monitoring on startup 
 32  config.addOption( 'base_poll_rate', 2,'internal supervising thread',hidden=1) 
 33  config.addOption( 'MaxNumResubmits', 5,'Maximum number of automatic job resubmits to do before giving') 
 34  config.addOption( 'MaxFracForResubmit', 0.25,'Maximum fraction of failed jobs before stopping automatic resubmission') 
 35  config.addOption( 'update_thread_pool_size' , 5,'Size of the thread pool. Each threads monitors a specific backaend at a given time. Minimum value is one, preferably set to the number_of_backends + 1') 
 36  config.addOption( 'default_backend_poll_rate' , 30,'Default rate for polling job status in the thread pool. This is the default value for all backends.') 
 37  config.addOption( 'Local' , 10,'Poll rate for Local backend.') 
 38  config.addOption( 'LCG' , 30,'Poll rate for LCG backend.') 
 39  config.addOption( 'Condor' , 30,'Poll rate for Condor backend.') 
 40  config.addOption( 'gLite' , 30,'Poll rate for gLite backend.') 
 41  config.addOption( 'LSF' , 20,'Poll rate for LSF backend.') 
 42  config.addOption( 'PBS' , 20,'Poll rate for PBS backend.') 
 43  config.addOption( 'Dirac' , 50,'Poll rate for Dirac backend.') 
 44  config.addOption( 'Panda' , 50,'Poll rate for Panda backend.') 
 45  #config.addOption( 'TestSubmitter', 1, 'Poll rate for TestSubmitter') 
 46   
 47  #Note: the rate of this callback is actually  MAX(base_poll_rate,callbacks_poll_rate) 
 48  config.addOption( 'creds_poll_rate', 30, "The frequency in seconds for credentials checker") 
 49  config.addOption( 'diskspace_poll_rate', 30, "The frequency in seconds for free disk checker") 
 50  config.addOption( 'DiskSpaceChecker', "", "disk space checking callback. This function should return False when there is no disk space available, True otherwise") 
 51   
 52   
 53  config.addOption( 'max_shutdown_retries',5,'OBSOLETE: this option has no effect anymore') 
 54   
 55   
 56  THREAD_POOL_SIZE = config[ 'update_thread_pool_size' ] 
 57  Qin = Queue.Queue() 
 58  ThreadPool = [] 
 59  #number of threads waiting for actions in Qin 
 60  tpFreeThreads = 0 
 61   
 62  # The JobAction class encapsulates a function, its arguments and its post result action  
 63  # based on what is defined as a successful run of the function.  
64 -class JobAction( object ):
65 - def __init__( self, function, args = (), kwargs = {}, 66 success = ( True, ), 67 callback_Success = lambda:None, 68 callback_Failure = lambda:None ):
69 self.function = function 70 self.args = args 71 72 self.kwargs = kwargs 73 self.success = success 74 self.callback_Success = callback_Success 75 self.callback_Failure = callback_Failure 76 self.thread = None 77 self.description = ''
78
79 -class MonitoringWorkerThread(GangaThread):
80 - def __init__(self,name):
82
83 - def run(self):
84 self._execUpdateAction()
85 86 # This function takes a JobAction object from the Qin queue, 87 # executes the embedded function and runs post result actions.
88 - def _execUpdateAction(self):
89 global tpFreeThreads 90 ##DEBUGGING THREADS 91 ##import sys 92 ##sys.settrace(_trace) 93 while not self.should_stop(): 94 log.debug( "%s waiting..." % threading.currentThread() ) 95 #setattr(threading.currentThread(), 'action', None) 96 tpFreeThreads+=1 97 98 while not self.should_stop(): 99 try: 100 action = Qin.get(block=True,timeout=0.5) 101 break 102 except Queue.Empty: 103 continue 104 105 if self.should_stop(): 106 break 107 108 tpFreeThreads-=1 109 #setattr(threading.currentThread(), 'action', action) 110 log.debug( "Qin's size is currently: %d" % Qin.qsize() ) 111 log.debug( "%s running..." % threading.currentThread() ) 112 113 if not isinstance( action, JobAction ): 114 continue 115 if action.function == 'stop': 116 break 117 try: 118 result = action.function( *action.args, **action.kwargs ) 119 except: 120 action.callback_Failure() 121 else: 122 if result in action.success: 123 action.callback_Success() 124 else: 125 action.callback_Failure()
126 127 # Create the thread pool
128 -def _makeThreadPool( threadPoolSize = THREAD_POOL_SIZE, daemonic = True ):
129 for i in range( THREAD_POOL_SIZE ): 130 t = MonitoringWorkerThread( name = "MonitoringWorker_%s" % i) 131 ThreadPool.append( t ) 132 t.start()
133 134
135 -def stop_and_free_thread_pool(fail_cb=None, max_retries=5):
136 """ 137 Clean shutdown of the thread pool. 138 A failed attempt to stop all the worker threads is followed by a call to the supplied callback which 139 decides if a new attempt is performed or not. 140 Example for a decision callback: 141 def myfail_cb(): 142 resp = raw_input("The cleanup procedures did not complete yet. Do you want to wait more?[y/N]") 143 return resp.lower()=='y' 144 """ 145 146 def join_worker_threads(threads,timeout=3): 147 for t in threads: 148 t.join(timeout)
149 150 for i in range( len( ThreadPool ) ): 151 Qin.put( JobAction( 'stop' ) ) 152 153 join_worker_threads(ThreadPool) 154 155 #clean shutdown of stalled threads 156 while True: 157 if not fail_cb or max_retries<=0: 158 break 159 stalled = [t for t in ThreadPool if t.isAlive()] 160 if not stalled: 161 break 162 if fail_cb(): #continue? 163 join_worker_threads(stalled,timeout=3) 164 max_retries-=1 165 else: 166 break 167 168 del ThreadPool[:] 169 170 # purge Qin
171 -def _purge_actions_queue():
172 """ 173 Purge Qin: consume the current queued actions 174 Note: the producer (i.e JobRegistry_Monitor) should be stopped before the method is called 175 """ 176 from Queue import Empty 177 #purge the queue 178 for i in range(len(Qin.queue)): 179 try: 180 action=Qin.get_nowait() 181 #let the *stop* action in queue, otherwise the worker threads will fail to terminate 182 if isinstance( action, JobAction ) and action.function == 'stop': 183 Qin.put( action ) 184 except Empty: 185 break
186 187 _makeThreadPool() 188 189 190 # Each entry for the updateDict_ts object (based on the UpdateDict class) is a _DictEntry object.
191 -class _DictEntry( object ):
192 - def __init__( self, backendObj, jobSet, entryLock, timeoutCounterMax ):
193 self.backendObj = backendObj 194 self.jobSet = jobSet 195 self.entryLock = entryLock 196 self.timeoutCounterMax = timeoutCounterMax 197 self.timeoutCounter = timeoutCounterMax - 0.01 198 self.timeLastUpdate = 0.0
199
200 - def updateActionTuple( self ):
201 return self.backendObj, self.jobSet, self.entryLock
202 203
204 -class UpdateDict( object ):
205 """ 206 This serves as the Update Table. Is is meant to be used 207 by wrapping it as a SynchronisedObject so as to ensure thread safety. 208 """
209 - def __init__( self ):
210 self.table = {}
211
212 - def addEntry( self, backendObj, backendCheckingFunction, jobList, timeoutMax = None ):
213 if not jobList: 214 return 215 if timeoutMax is None: 216 timeoutMax = config[ 'default_backend_poll_rate' ] 217 log.debug( "\n*----addEntry()" ) 218 backend = backendObj._name 219 try: 220 backendObj, jSet, lock = self.table[ backend ].updateActionTuple() 221 except KeyError: # New backend. 222 self.table[ backend ] = _DictEntry( backendObj, Set( jobList ), threading.RLock(), timeoutMax ) 223 Qin.put( JobAction( backendCheckingFunction, self.table[ backend ].updateActionTuple() ) ) # queue to get processed 224 log.debug( "**Adding %s to new %s backend entry." % ( [x.id for x in jobList], backend ) ) 225 return True 226 else: 227 # backend is in Qin waiting to be processed. Increase it's list of jobs 228 # by updating the table entry accordingly. This will reduce the 229 # number of update requests. 230 # i.e. It's like getting a friend in the queue to pay for your purchases as well! ;p 231 log.debug( "*: backend=%s, isLocked=%s, isOwner=%s, joblist=%s, queue=%s" % (backend, lock._RLock__count, lock._is_owned(), [x.id for x in jobList], Qin.qsize() ) ) 232 if lock.acquire( False ): 233 try: 234 jSetSize = len( jSet ) 235 log.debug( "Lock acquire successful. Updating jSet %s with %s." % ( [x.id for x in jSet], [x.id for x in jobList] ) ) 236 jSet.update( jobList ) 237 # If jSet is empty it was cleared by an update action 238 # i.e. the queue does not contain an update action for the particular backend any more. 239 if jSetSize: # jSet not cleared 240 log.debug( "%s backend job set exists. Added %s to it." % ( backend, [x.id for x in jobList] ) ) 241 else: 242 Qin.put( JobAction( backendCheckingFunction, self.table[ backend ].updateActionTuple() ) ) 243 log.debug( "Added new %s backend update action for jobs %s." % ( backend, [ x.id for x in self.table[ backend ].updateActionTuple()[1] ] ) ) 244 finally: 245 lock.release() 246 log.debug( "**: backend=%s, isLocked=%s, isOwner=%s, joblist=%s, queue=%s" % (backend, lock._RLock__count, lock._is_owned(), [x.id for x in jobList], Qin.qsize() ) ) 247 return True 248 else: 249 log.debug( "Could not acquire lock for %s backend. addEntry() skipped." % backend ) 250 log.debug( "**: backend=%s, isLocked=%s, isOwner=%s, joblist=%s, queue=%s" % (backend, lock._RLock__count, lock._is_owned(), [x.id for x in jobList], Qin.qsize() ) ) 251 return False
252
253 - def clearEntry( self, backend ):
254 try: 255 entry = self.table[ backend ] 256 except KeyError: 257 log.error( "Error clearing the %s backend. It does not exist!" % backend ) 258 else: 259 entry.jobSet = Set() 260 entry.timeoutCounter = entry.timeoutCounterMax
261
262 - def timeoutCheck( self ):
263 for backend, entry in self.table.items(): 264 # timeoutCounter is reset to its max value ONLY by a successful update action. 265 # 266 # Initial value and subsequent resets by timeoutCheck() will set the timeoutCounter 267 # to a value just short of the max value to ensure that it the timeoutCounter is 268 # not decremented simply because there are no updates occuring. 269 if not entry.entryLock._RLock__count and \ 270 entry.timeoutCounter == entry.timeoutCounterMax and \ 271 entry.entryLock.acquire( False ): 272 log.debug( "%s has been reset. Acquired lock to begin countdown." % backend ) 273 entry.timeLastUpdate = time.time() 274 # decrease timeout counter 275 if entry.entryLock._is_owned(): 276 if entry.timeoutCounter <= 0.0: 277 entry.timeoutCounter = entry.timeoutCounterMax - 0.01 278 entry.timeLastUpdate = time.time() 279 entry.entryLock.release() 280 log.debug( "%s backend counter timeout. Resetting to %s." % ( backend, entry.timeoutCounter ) ) 281 else: 282 _l = time.time() 283 entry.timeoutCounter -= _l - entry.timeLastUpdate 284 entry.timeLastUpdate = _l
285 # log.debug( "%s backend counter is %s." % ( backend, entry.timeoutCounter ) ) 286
287 - def isBackendLocked( self, backend ):
288 try: 289 return bool( self.table[ backend ].entryLock._RLock__count ) 290 except KeyError: 291 return False
292
293 - def releaseLocks( self ):
294 for backend, entry in self.table.items(): 295 if entry.entryLock._is_owned(): 296 entry.entryLock.release()
297 298 updateDict_ts = SynchronisedObject( UpdateDict() ) 299 300
301 -class CallbackHookEntry( object ):
302 - def __init__( self, argDict, enabled = True, timeout = 0):
303 self.argDict = argDict 304 self.enabled = enabled 305 #the frequency in seconds 306 self.timeout = timeout 307 #record the time when this hook has been run 308 self._lastRun = 0
309
310 -class JobRegistry_Monitor( GangaThread ):
311 """Job monitoring service thread.""" 312 uPollRate = 0.5 313 minPollRate = 1.0
314 - def __init__(self, registry ):
315 GangaThread.__init__( self, name = "JobRegistry_Monitor" ) 316 self.setDaemon( True ) 317 self.registry = registry 318 self.__sleepCounter = 0.0 319 self.__updateTimeStamp = time.time() 320 self.progressCallback = lambda x:None 321 self.callbackHookDict = {} 322 self.clientCallbackDict = {} 323 self.alive = True 324 self.enabled = False 325 #run the monitoring loop continuosly (steps=-1) or just a specified number of steps(>0) 326 self.steps = -1 327 self.activeBackends = {} 328 self.updateJobStatus = None 329 self.defaultUpdateJobStatus = None 330 self.errors = {} 331 # Create the default backend update method and add to callback hook. 332 self.makeUpdateJobStatusFunction() 333 334 # Add credential checking to monitoring loop 335 for _credObj in Credentials._allCredentials.itervalues(): 336 log.debug( "Setting callback hook for %s" % _credObj._name ) 337 self.setCallbackHook( self.makeCredCheckJobInsertor( _credObj ), {}, True, timeout=config[ 'creds_poll_rate'] ) 338 339 # Add low disk-space checking to monitoring loop 340 log.debug( "Setting callback hook for disk space checking") 341 self.setCallbackHook( self.diskSpaceCheckJobInsertor, {}, True,timeout=config[ 'diskspace_poll_rate'] ) 342 343 #synch objects 344 #main loop mutex 345 self.__mainLoopCond=threading.Condition() 346 #cleanup synch 347 self.__cleanUpEvent=threading.Event() 348 #asynch mon loop running synch 349 self.__monStepsTerminatedEvent=threading.Event() 350 #event to signal the break of job lists iterators 351 self.stopIter=threading.Event() 352 self.stopIter.set()
353
354 - def run( self ):
355 """ 356 Main monitoring loop 357 """ 358 import thread 359 Ganga.Core.MonitoringComponent.monitoring_thread_id = thread.get_ident() 360 del thread 361 362 while self.alive: 363 #synchronize the main loop since we can get disable requests 364 self.__mainLoopCond.acquire() 365 try: 366 log.debug( 'Monitoring loop lock acquired. Running loop' ) 367 #we are blocked here while the loop is disabled 368 while not self.enabled: 369 self.__cleanUp() 370 if not self.alive: #stopped? 371 return 372 #disabled,but still alive, so we keep waiting 373 self.__mainLoopCond.wait() 374 375 self.__monStep() 376 377 #delay here the monitoring steps according to the configuration 378 while self.__sleepCounter > 0.0: 379 self.progressCallback( self.__sleepCounter ) 380 self.__mainLoopCond.wait( self.uPollRate ) 381 if not self.enabled: 382 if not self.alive: #stopped? 383 self.__cleanUp() 384 #disabled, break to the outer while 385 break 386 else: 387 self.__sleepCounter -= self.uPollRate 388 389 else: 390 #run on demand? 391 if self.steps>0: 392 #decrement the remaining number of steps to run 393 self.steps-=1 394 #requested number of steps executed, disabling... 395 if self.steps<=0: 396 self.enabled=False 397 #notify the blocking call of runMonitoring() 398 self.__monStepsTerminatedEvent.set() 399 finally: 400 self.__mainLoopCond.release() 401 402 #final cleanup 403 self.__cleanUp()
404
405 - def __monStep(self):
406 """ 407 A single monitoring step in the monitoring loop 408 Note: 409 Internally the step does not block, it produces *actions* that are queued to be run 410 in the thread pool 411 """ 412 if not self.callbackHookDict: 413 log.error( 'No callback hooks registered' ) 414 return 415 for cbHookFunc in self.callbackHookDict.keys(): 416 try: 417 cbHookEntry = self.callbackHookDict[ cbHookFunc ] 418 except KeyError: 419 continue 420 if cbHookEntry.enabled and time.time()-cbHookEntry._lastRun>=cbHookEntry.timeout: 421 log.debug("Running monitoring callback hook function %s(**%s)" % (cbHookFunc, cbHookEntry.argDict)) 422 cbHookFunc(**cbHookEntry.argDict) 423 cbHookEntry._lastRun = time.time() 424 425 self.runClientCallbacks() 426 self.__updateTimeStamp = time.time() 427 self.__sleepCounter = config[ 'base_poll_rate' ]
428 429
430 - def runMonitoring( self, steps=1, timeout=60, jobs=None ):
431 """ 432 Enable/Run the monitoring loop and wait for the monitoring steps completion. 433 Parameters: 434 steps: number of monitoring steps to run 435 timeout: how long to wait for monitor steps termination (seconds) 436 jobs: a registry slice to be monitored (None -> all jobs), it may be passed by the user so ._impl is stripped if needed 437 Return: 438 False, if the loop cannot be started or the timeout occured while waiting for monitoring termination 439 True, if the monitoring steps were successfully executed 440 Note: 441 This method is meant to be used in Ganga scripts to request monitoring on demand. 442 """ 443 444 if not type(steps) is int or steps<0: 445 log.warning("The number of monitor steps should be a positive integer") 446 return False 447 448 if not self.alive: 449 log.error("Cannot run the monitoring loop. It has already been stopped") 450 return False 451 452 #we don not allow the user's request the monitoring loop while the internal services are stopped 453 if not Coordinator.servicesEnabled: 454 log.error("Cannot run the monitoring loop." 455 "The internal services are disabled (check your credentials or available disk space)") 456 return False 457 458 # if the monitoring is disabled (e.g. scripts) 459 if not self.enabled: 460 # and there are some required cred which are missing 461 # (the monitoring loop does not monitor the credentials so we need to check 'by hand' here) 462 _missingCreds = Coordinator.getMissingCredentials() 463 if _missingCreds: 464 log.error("Cannot run the monitoring loop. The following credentials are required: %s" % _missingCreds) 465 return False 466 467 self.__mainLoopCond.acquire() 468 log.debug( 'Monitoring loop lock acquired. Enabling mon loop' ) 469 try: 470 if self.enabled or self.__isInProgress(): 471 log.error("The monitoring loop is already running.") 472 return False 473 474 if jobs: 475 try: 476 m_jobs = jobs._impl 477 except AttributeError: 478 m_jobs = jobs 479 480 # additional check if m_jobs is really a registry slice 481 # the underlying code is not prepared to handle correctly the situation if it is not 482 from Ganga.GPIDev.Lib.Registry.RegistrySlice import RegistrySlice 483 if not isinstance(m_jobs,RegistrySlice): 484 log.warning('runMonitoring: jobs argument must be a registry slice such as a result of jobs.select() or jobs[i1:i2]') 485 return False 486 487 self.registry = m_jobs 488 489 #enable mon loop 490 self.enabled = True 491 #set how many steps to run 492 self.steps = steps 493 #enable job list iterators 494 self.stopIter.clear() 495 # Start backend update timeout checking. 496 self.setCallbackHook( updateDict_ts.timeoutCheck, {}, True ) 497 498 #wake up the mon loop 499 self.__mainLoopCond.notifyAll() 500 finally: 501 self.__mainLoopCond.release() 502 503 #wait to execute the steps 504 self.__monStepsTerminatedEvent.wait() 505 self.__monStepsTerminatedEvent.clear() 506 #wait the steps to be executed or timeout to occur 507 if not self.__awaitTermination(timeout): 508 log.warning("Monitoring loop started but did not complete in the given timeout.") 509 #force loops termination 510 self.stopIter.set() 511 return False 512 return True
513
514 - def enableMonitoring( self ):
515 """ 516 Run the monitoring loop continuously 517 """ 518 519 if not self.alive: 520 log.error("Cannot start monitoring loop. It has already been stopped") 521 return False 522 523 self.__mainLoopCond.acquire() 524 log.debug( 'Monitoring loop lock acquired. Enabling mon loop' ) 525 try: 526 self.enabled = True 527 #infinite loops 528 self.steps=-1 529 #enable job list iterators 530 self.stopIter.clear() 531 log.debug( 'Monitoring loop enabled' ) 532 # Start backend update timeout checking. 533 self.setCallbackHook( updateDict_ts.timeoutCheck, {}, True ) 534 self.__mainLoopCond.notifyAll() 535 finally: 536 self.__mainLoopCond.release() 537 return True
538
539 - def disableMonitoring( self ):
540 """ 541 Disable the monitoring loop 542 """ 543 544 if not self.alive: 545 log.error("Cannot disable monitoring loop. It has already been stopped") 546 return False 547 548 self.__mainLoopCond.acquire() 549 log.debug( 'Monitoring loop lock acquired. Disabling mon loop' ) 550 try: 551 self.enabled = False 552 self.steps=-1 553 self.stopIter.set() 554 log.debug( 'Monitoring loop disabled' ) 555 #wake up the monitoring loop 556 self.__mainLoopCond.notifyAll() 557 finally: 558 self.__mainLoopCond.release() 559 #wait for cleanup 560 self.__cleanUpEvent.wait() 561 self.__cleanUpEvent.clear() 562 return True
563
564 - def stop( self, fail_cb=None, max_retries=5 ):
565 """ 566 Stop the monitoring loop 567 Parameters: 568 fail_cb : if not None, this callback is called if a retry attempt is needed 569 """ 570 if not self.alive: 571 log.warning("Monitoring loop has already been stopped") 572 return False 573 574 self.__mainLoopCond.acquire() 575 log.info( 'Stopping the monitoring component...' ) 576 try: 577 #signal the main thread to finish 578 self.alive = False 579 self.enabled = False 580 self.steps=-1 581 self.stopIter.set() 582 #wake up the monitoring loop 583 self.__mainLoopCond.notifyAll() 584 finally: 585 self.__mainLoopCond.release() 586 #wait for cleanup 587 self.__cleanUpEvent.wait() 588 self.__cleanUpEvent.clear() 589 590 ### ----> 591 ###wait for all worker threads to finish 592 ###self.__awaitTermination() 593 ###join the worker threads 594 ###stop_and_free_thread_pool(fail_cb,max_retries) 595 ###log.info( 'Monitoring component stopped successfully!' ) 596 return True
597
598 - def __cleanUp(self):
599 """ 600 Cleanup function ran in JobRegistry_Monitor thread to disable the monitoring loop 601 updateDict_ts.timeoutCheck can hold timeout locks that need to be released 602 in order to allow the pool threads to be freed. 603 """ 604 605 #cleanup the global Qin 606 _purge_actions_queue() 607 #release timeout check locks 608 timeoutCheck = updateDict_ts.timeoutCheck 609 if self.callbackHookDict.has_key( timeoutCheck ): 610 updateDict_ts.releaseLocks() 611 self.removeCallbackHook( timeoutCheck ) 612 #wake up the calls waiting for cleanup 613 self.__cleanUpEvent.set()
614
615 - def __isInProgress(self):
616 return self.steps>0 or Qin.qsize()>0 or tpFreeThreads<len( ThreadPool )
617
618 - def __awaitTermination( self,timeout=5 ):
619 """ 620 Wait for resources to be cleaned up (threads,queue) 621 Returns: 622 False, on timeout 623 """ 624 while self.__isInProgress(): 625 time.sleep(self.uPollRate) 626 timeout-=self.uPollRate 627 if timeout<=0: 628 return False 629 return True
630 631
632 - def setCallbackHook( self, func, argDict, enabled, timeout=0 ):
633 log.debug( 'Setting Callback hook function %s.' % func ) 634 if func in self.callbackHookDict: 635 log.debug( 'Replacing existing callback hook function with %s' % func ) 636 self.callbackHookDict[ func ] = CallbackHookEntry( argDict = argDict, enabled = enabled, timeout=timeout )
637
638 - def removeCallbackHook( self, func ):
639 log.debug( 'Removing Callback hook function %s.' % func ) 640 try: 641 del self.callbackHookDict[ func ] 642 except KeyError: 643 log.error( 'Callback hook function does not exist.' )
644
645 - def enableCallbackHook( self, func ):
646 log.debug( 'Enabling Callback hook function %s.' % func ) 647 try: 648 self.callbackHookDict[ func ].enabled = True 649 except KeyError: 650 log.error( 'Callback hook function does not exist.' )
651
652 - def disableCallbackHook( self, func ):
653 log.debug( 'Disabling Callback hook function %s.' % func ) 654 try: 655 self.callbackHookDict[ func ].enabled = False 656 except KeyError: 657 log.error( 'Callback hook function does not exist.' )
658
659 - def runClientCallbacks( self ):
660 for clientFunc in self.clientCallbackDict: 661 log.debug( 'Running client callback hook function %s(**%s).' % ( clientFunc, self.clientCallbackDict[ clientFunc ] ) ) 662 clientFunc( **self.clientCallbackDict[ clientFunc ] )
663
664 - def setClientCallback( self, clientFunc, argDict ):
665 log.debug( 'Setting client callback hook function %s(**%s).' % ( clientFunc, argDict ) ) 666 try: 667 self.clientCallbackDict[ clientFunc ] = argDict 668 except KeyError: 669 log.error( "Callback hook function not found." )
670
671 - def removeClientCallback( self, clientFunc ):
672 log.debug( 'Removing client callback hook function %s.' % clientFunc ) 673 try: 674 del self.clientCallbackDict[ clientFunc ] 675 except KeyError: 676 log.error( "%s not found in client callback dictionary." % clientFunc.__name__ )
677
678 - def __defaultActiveBackendsFunc( self ):
679 active_backends = {} 680 # FIXME: this is not thread safe: if the new jobs are added then iteration exception is raised 681 for i in self.registry.ids(): 682 try: 683 j = self.registry(i) 684 if j.status in [ 'submitted', 'running' ]: #, 'completing' ]: 685 j._getWriteAccess() 686 bn = j.backend._name 687 active_backends.setdefault( bn, [] ) 688 active_backends[ bn ].append( j ) 689 except RegistryKeyError, x: 690 pass # the job was removed 691 except RegistryLockError, x: 692 pass # the job was removed 693 return active_backends
694
695 - def makeUpdateJobStatusFunction( self, makeActiveBackendsFunc = None ):
696 if makeActiveBackendsFunc is None: 697 makeActiveBackendsFunc = self.__defaultActiveBackendsFunc 698 699 def checkBackend( backendObj, jobListSet, lock ): # This function will be run by update threads 700 currentThread = threading.currentThread() 701 lock.acquire() # timeout mechanism may have acquired the lock to impose delay. 702 try: 703 log.debug( "[Update Thread %s] Lock acquired for %s" % ( currentThread, backendObj._name ) ) 704 jobList_fromset = IList(filter( lambda x:x.status in [ 'submitted', 'running' ], jobListSet ),self.stopIter) 705 updateDict_ts.clearEntry( backendObj._name ) 706 try: 707 log.debug( "[Update Thread %s] Updating %s with %s." % ( currentThread, backendObj._name, [x.id for x in jobList_fromset ] ) ) 708 backendObj.master_updateMonitoringInformation( jobList_fromset ) 709 710 # resubmit if required 711 for j in jobList_fromset: 712 713 if not j.do_auto_resubmit: 714 continue 715 716 if len(j.subjobs) == 0: 717 try_resubmit = j.info.submit_counter <= config['MaxNumResubmits'] 718 else: 719 # Check for max number of resubmissions 720 skip = False 721 for s in j.subjobs: 722 if s.info.submit_counter > config['MaxNumResubmits']: 723 skip = True 724 725 if skip: 726 continue 727 728 num_com = len( [s for s in j.subjobs if s.status in ['completed'] ] ) 729 num_fail = len( [s for s in j.subjobs if s.status in ['failed'] ] ) 730 731 #log.critical('Checking failed subjobs for job %d... %d %s',j.id,num_com,num_fail) 732 733 try_resubmit = num_fail > 0 and (float(num_fail) / float(num_com+num_fail)) < config['MaxFracForResubmit'] 734 735 if try_resubmit: 736 if j.backend.check_auto_resubmit(): 737 log.warning('Auto-resubmit job %d...' % j.id) 738 j.auto_resubmit() 739 740 except BackendError, x: 741 self._handleError( x, x.backend_name, 0 ) 742 except Exception, x: 743 self._handleError( x, backendObj._name, 1 ) 744 log.debug( "[Update Thread %s] Flushing registry %s." % ( currentThread, [x.id for x in jobList_fromset ] ) ) 745 self.registry._flush( jobList_fromset ) # Optimisation required! 746 finally: 747 lock.release() 748 log.debug( "[Update Thread %s] Lock released for %s." % ( currentThread, backendObj._name ) )
749 750 def f( activeBackendsFunc ): 751 activeBackends = activeBackendsFunc() 752 for jList in activeBackends.values(): 753 backendObj = jList[0].backend 754 try: 755 pRate = config[ backendObj._name ] 756 except: 757 pRate = config[ 'default_backend_poll_rate' ] 758 # TODO: To include an if statement before adding entry to 759 # updateDict. Entry is added only if credential requirements 760 # of the particular backend is satisfied. 761 # This requires backends to hold relevant information on its 762 # credential requirements. 763 updateDict_ts.addEntry( backendObj, checkBackend, jList, pRate )
764 765 if makeActiveBackendsFunc == self.__defaultActiveBackendsFunc: 766 self.defaultUpdateJobStatus = f 767 if self.updateJobStatus is not None: 768 self.removeCallbackHook( self.updateJobStatus ) 769 self.updateJobStatus = f 770 self.setCallbackHook( f, { 'activeBackendsFunc' : makeActiveBackendsFunc }, True ) 771 return self.updateJobStatus 772
773 - def makeCredCheckJobInsertor( self, credObj ):
774 def credCheckJobInsertor(): 775 def cb_Success(): 776 self.enableCallbackHook( credCheckJobInsertor )
777 778 def cb_Failure(): 779 self.enableCallbackHook( credCheckJobInsertor ) 780 self._handleError( '%s checking failed!' % credObj._name, credObj._name, 1 ) 781 782 log.debug( 'Inserting %s checking function to Qin.' % credObj._name ) 783 _action = JobAction( function = self.makeCredChecker( credObj ), 784 callback_Success = cb_Success, 785 callback_Failure = cb_Failure ) 786 self.disableCallbackHook( credCheckJobInsertor ) 787 try: 788 Qin.put( _action ) 789 except: 790 cb_Failure() 791 return credCheckJobInsertor 792
793 - def makeCredChecker( self, credObj ):
794 def credChecker(): 795 log.debug( "Checking %s." % credObj._name ) 796 try: 797 s = credObj.renew() 798 except Exception, msg: 799 return False 800 else: 801 return s
802 return credChecker 803 804
805 - def diskSpaceCheckJobInsertor(self):
806 """ 807 Inserts the disk space checking task in the monitoring task queue 808 """ 809 def cb_Success(): 810 self.enableCallbackHook(self.diskSpaceCheckJobInsertor)
811 812 def cb_Failure(): 813 self.disableCallbackHook(self.diskSpaceCheckJobInsertor) 814 self._handleError('Available disk space checking failed and it has been disabled!', 'DiskSpaceChecker',False) 815 816 log.debug('Inserting disk space checking function to Qin.') 817 _action = JobAction(function = Coordinator._diskSpaceChecker, 818 callback_Success = cb_Success, 819 callback_Failure = cb_Failure) 820 self.disableCallbackHook(self.diskSpaceCheckJobInsertor) 821 try: 822 Qin.put(_action) 823 except: 824 cb_Failure() 825
826 - def updateJobs( self ):
827 if time.time() - self.__updateTimeStamp >= self.minPollRate: 828 self.__sleepCounter = 0.0 829 else: 830 self.progressCallback( "Processing... Please wait." ) 831 log.debug( "Updates too close together... skipping latest update request." ) 832 self.__sleepCounter = self.minPollRate
833
834 - def _handleError( self, x, backend_name, show_traceback ):
835 def log_error(): 836 log.error( 'Problem in the monitoring loop: %s', str( x ) ) 837 if show_traceback: 838 Ganga.Utility.logging.log_user_exception( log )
839 bn = backend_name 840 self.errors.setdefault( bn, 0 ) 841 if self.errors[ bn ] == 0: 842 log_error() 843 if not config[ 'repeat_messages' ]: 844 log.info( 'Further error messages from %s handler in the monitoring loop will be skipped.' % bn ) 845 else: 846 if config[ 'repeat_messages' ]: 847 log_error() 848 self.errors[ bn ] += 1 849 850 851 ######## THREAD POOL DEBUGGING ###########
852 -def _trace(frame, event, arg):
853 setattr(threading.currentThread(), '_frame', frame)
854
855 -def getStackTrace():
856 import inspect 857 858 try: 859 status = "Available threads:\n" 860 861 for worker in ThreadPool: 862 status = status + " " + worker.getName() + ":\n" 863 864 frame = worker._frame 865 if frame: 866 status = status + " stack:\n" 867 for frame, filename, line, function_name, context, index in inspect.getouterframes(frame): 868 status = status + " " + function_name + " @ " + filename + " # " + str(line) + "\n" 869 870 status = status + "\n" 871 print "Queue",Qin.queue 872 return status 873 finally: 874 pass
875