1 import Queue, threading, time
2
3 from Ganga.Core.GangaThread import GangaThread
4 from Ganga.Core.GangaRepository import RegistryKeyError, RegistryLockError
5
6 import sys
7 if sys.hexversion >= 0x020600F0:
8 Set = set
9 else:
10 from sets import Set
11
12 from Ganga.Utility.threads import SynchronisedObject
13 from Ganga.Utility.util import IList
14
15 import Ganga.GPIDev.Credentials as Credentials
16 from Ganga.Core.InternalServices import Coordinator
17
18
19 import Ganga.Utility.logging
20 log = Ganga.Utility.logging.getLogger()
21
22 from Ganga.Core import BackendError
23 import Ganga.GPIDev.Adapters.IBackend
24 import Ganga.Utility.Config
25 config = Ganga.Utility.Config.makeConfig( 'PollThread', 'background job status monitoring and output retrieval' )
26
27 from Ganga.Core import GangaException
28
29
30 config.addOption( 'repeat_messages',False,'if 0 then log only once the errors for a given backend and do not repeat them anymore')
31 config.addOption( 'autostart',None,'enable monitoring automatically at startup, in script mode monitoring is disabled by default, in interactive mode it is enabled', type=type(True))
32 config.addOption( 'base_poll_rate', 2,'internal supervising thread',hidden=1)
33 config.addOption( 'MaxNumResubmits', 5,'Maximum number of automatic job resubmits to do before giving')
34 config.addOption( 'MaxFracForResubmit', 0.25,'Maximum fraction of failed jobs before stopping automatic resubmission')
35 config.addOption( 'update_thread_pool_size' , 5,'Size of the thread pool. Each threads monitors a specific backaend at a given time. Minimum value is one, preferably set to the number_of_backends + 1')
36 config.addOption( 'default_backend_poll_rate' , 30,'Default rate for polling job status in the thread pool. This is the default value for all backends.')
37 config.addOption( 'Local' , 10,'Poll rate for Local backend.')
38 config.addOption( 'LCG' , 30,'Poll rate for LCG backend.')
39 config.addOption( 'Condor' , 30,'Poll rate for Condor backend.')
40 config.addOption( 'gLite' , 30,'Poll rate for gLite backend.')
41 config.addOption( 'LSF' , 20,'Poll rate for LSF backend.')
42 config.addOption( 'PBS' , 20,'Poll rate for PBS backend.')
43 config.addOption( 'Dirac' , 50,'Poll rate for Dirac backend.')
44 config.addOption( 'Panda' , 50,'Poll rate for Panda backend.')
45
46
47
48 config.addOption( 'creds_poll_rate', 30, "The frequency in seconds for credentials checker")
49 config.addOption( 'diskspace_poll_rate', 30, "The frequency in seconds for free disk checker")
50 config.addOption( 'DiskSpaceChecker', "", "disk space checking callback. This function should return False when there is no disk space available, True otherwise")
51
52
53 config.addOption( 'max_shutdown_retries',5,'OBSOLETE: this option has no effect anymore')
54
55
56 THREAD_POOL_SIZE = config[ 'update_thread_pool_size' ]
57 Qin = Queue.Queue()
58 ThreadPool = []
59
60 tpFreeThreads = 0
61
62
63
65 - def __init__( self, function, args = (), kwargs = {},
66 success = ( True, ),
67 callback_Success = lambda:None,
68 callback_Failure = lambda:None ):
69 self.function = function
70 self.args = args
71
72 self.kwargs = kwargs
73 self.success = success
74 self.callback_Success = callback_Success
75 self.callback_Failure = callback_Failure
76 self.thread = None
77 self.description = ''
78
82
85
86
87
89 global tpFreeThreads
90
91
92
93 while not self.should_stop():
94 log.debug( "%s waiting..." % threading.currentThread() )
95
96 tpFreeThreads+=1
97
98 while not self.should_stop():
99 try:
100 action = Qin.get(block=True,timeout=0.5)
101 break
102 except Queue.Empty:
103 continue
104
105 if self.should_stop():
106 break
107
108 tpFreeThreads-=1
109
110 log.debug( "Qin's size is currently: %d" % Qin.qsize() )
111 log.debug( "%s running..." % threading.currentThread() )
112
113 if not isinstance( action, JobAction ):
114 continue
115 if action.function == 'stop':
116 break
117 try:
118 result = action.function( *action.args, **action.kwargs )
119 except:
120 action.callback_Failure()
121 else:
122 if result in action.success:
123 action.callback_Success()
124 else:
125 action.callback_Failure()
126
127
133
134
136 """
137 Clean shutdown of the thread pool.
138 A failed attempt to stop all the worker threads is followed by a call to the supplied callback which
139 decides if a new attempt is performed or not.
140 Example for a decision callback:
141 def myfail_cb():
142 resp = raw_input("The cleanup procedures did not complete yet. Do you want to wait more?[y/N]")
143 return resp.lower()=='y'
144 """
145
146 def join_worker_threads(threads,timeout=3):
147 for t in threads:
148 t.join(timeout)
149
150 for i in range( len( ThreadPool ) ):
151 Qin.put( JobAction( 'stop' ) )
152
153 join_worker_threads(ThreadPool)
154
155
156 while True:
157 if not fail_cb or max_retries<=0:
158 break
159 stalled = [t for t in ThreadPool if t.isAlive()]
160 if not stalled:
161 break
162 if fail_cb():
163 join_worker_threads(stalled,timeout=3)
164 max_retries-=1
165 else:
166 break
167
168 del ThreadPool[:]
169
170
172 """
173 Purge Qin: consume the current queued actions
174 Note: the producer (i.e JobRegistry_Monitor) should be stopped before the method is called
175 """
176 from Queue import Empty
177
178 for i in range(len(Qin.queue)):
179 try:
180 action=Qin.get_nowait()
181
182 if isinstance( action, JobAction ) and action.function == 'stop':
183 Qin.put( action )
184 except Empty:
185 break
186
187 _makeThreadPool()
188
189
190
191 -class _DictEntry( object ):
192 - def __init__( self, backendObj, jobSet, entryLock, timeoutCounterMax ):
193 self.backendObj = backendObj
194 self.jobSet = jobSet
195 self.entryLock = entryLock
196 self.timeoutCounterMax = timeoutCounterMax
197 self.timeoutCounter = timeoutCounterMax - 0.01
198 self.timeLastUpdate = 0.0
199
200 - def updateActionTuple( self ):
201 return self.backendObj, self.jobSet, self.entryLock
202
203
205 """
206 This serves as the Update Table. Is is meant to be used
207 by wrapping it as a SynchronisedObject so as to ensure thread safety.
208 """
211
212 - def addEntry( self, backendObj, backendCheckingFunction, jobList, timeoutMax = None ):
213 if not jobList:
214 return
215 if timeoutMax is None:
216 timeoutMax = config[ 'default_backend_poll_rate' ]
217 log.debug( "\n*----addEntry()" )
218 backend = backendObj._name
219 try:
220 backendObj, jSet, lock = self.table[ backend ].updateActionTuple()
221 except KeyError:
222 self.table[ backend ] = _DictEntry( backendObj, Set( jobList ), threading.RLock(), timeoutMax )
223 Qin.put( JobAction( backendCheckingFunction, self.table[ backend ].updateActionTuple() ) )
224 log.debug( "**Adding %s to new %s backend entry." % ( [x.id for x in jobList], backend ) )
225 return True
226 else:
227
228
229
230
231 log.debug( "*: backend=%s, isLocked=%s, isOwner=%s, joblist=%s, queue=%s" % (backend, lock._RLock__count, lock._is_owned(), [x.id for x in jobList], Qin.qsize() ) )
232 if lock.acquire( False ):
233 try:
234 jSetSize = len( jSet )
235 log.debug( "Lock acquire successful. Updating jSet %s with %s." % ( [x.id for x in jSet], [x.id for x in jobList] ) )
236 jSet.update( jobList )
237
238
239 if jSetSize:
240 log.debug( "%s backend job set exists. Added %s to it." % ( backend, [x.id for x in jobList] ) )
241 else:
242 Qin.put( JobAction( backendCheckingFunction, self.table[ backend ].updateActionTuple() ) )
243 log.debug( "Added new %s backend update action for jobs %s." % ( backend, [ x.id for x in self.table[ backend ].updateActionTuple()[1] ] ) )
244 finally:
245 lock.release()
246 log.debug( "**: backend=%s, isLocked=%s, isOwner=%s, joblist=%s, queue=%s" % (backend, lock._RLock__count, lock._is_owned(), [x.id for x in jobList], Qin.qsize() ) )
247 return True
248 else:
249 log.debug( "Could not acquire lock for %s backend. addEntry() skipped." % backend )
250 log.debug( "**: backend=%s, isLocked=%s, isOwner=%s, joblist=%s, queue=%s" % (backend, lock._RLock__count, lock._is_owned(), [x.id for x in jobList], Qin.qsize() ) )
251 return False
252
253 - def clearEntry( self, backend ):
254 try:
255 entry = self.table[ backend ]
256 except KeyError:
257 log.error( "Error clearing the %s backend. It does not exist!" % backend )
258 else:
259 entry.jobSet = Set()
260 entry.timeoutCounter = entry.timeoutCounterMax
261
263 for backend, entry in self.table.items():
264
265
266
267
268
269 if not entry.entryLock._RLock__count and \
270 entry.timeoutCounter == entry.timeoutCounterMax and \
271 entry.entryLock.acquire( False ):
272 log.debug( "%s has been reset. Acquired lock to begin countdown." % backend )
273 entry.timeLastUpdate = time.time()
274
275 if entry.entryLock._is_owned():
276 if entry.timeoutCounter <= 0.0:
277 entry.timeoutCounter = entry.timeoutCounterMax - 0.01
278 entry.timeLastUpdate = time.time()
279 entry.entryLock.release()
280 log.debug( "%s backend counter timeout. Resetting to %s." % ( backend, entry.timeoutCounter ) )
281 else:
282 _l = time.time()
283 entry.timeoutCounter -= _l - entry.timeLastUpdate
284 entry.timeLastUpdate = _l
285
286
288 try:
289 return bool( self.table[ backend ].entryLock._RLock__count )
290 except KeyError:
291 return False
292
297
298 updateDict_ts = SynchronisedObject( UpdateDict() )
299
300
301 -class CallbackHookEntry( object ):
302 - def __init__( self, argDict, enabled = True, timeout = 0):
303 self.argDict = argDict
304 self.enabled = enabled
305
306 self.timeout = timeout
307
308 self._lastRun = 0
309
311 """Job monitoring service thread."""
312 uPollRate = 0.5
313 minPollRate = 1.0
315 GangaThread.__init__( self, name = "JobRegistry_Monitor" )
316 self.setDaemon( True )
317 self.registry = registry
318 self.__sleepCounter = 0.0
319 self.__updateTimeStamp = time.time()
320 self.progressCallback = lambda x:None
321 self.callbackHookDict = {}
322 self.clientCallbackDict = {}
323 self.alive = True
324 self.enabled = False
325
326 self.steps = -1
327 self.activeBackends = {}
328 self.updateJobStatus = None
329 self.defaultUpdateJobStatus = None
330 self.errors = {}
331
332 self.makeUpdateJobStatusFunction()
333
334
335 for _credObj in Credentials._allCredentials.itervalues():
336 log.debug( "Setting callback hook for %s" % _credObj._name )
337 self.setCallbackHook( self.makeCredCheckJobInsertor( _credObj ), {}, True, timeout=config[ 'creds_poll_rate'] )
338
339
340 log.debug( "Setting callback hook for disk space checking")
341 self.setCallbackHook( self.diskSpaceCheckJobInsertor, {}, True,timeout=config[ 'diskspace_poll_rate'] )
342
343
344
345 self.__mainLoopCond=threading.Condition()
346
347 self.__cleanUpEvent=threading.Event()
348
349 self.__monStepsTerminatedEvent=threading.Event()
350
351 self.stopIter=threading.Event()
352 self.stopIter.set()
353
355 """
356 Main monitoring loop
357 """
358 import thread
359 Ganga.Core.MonitoringComponent.monitoring_thread_id = thread.get_ident()
360 del thread
361
362 while self.alive:
363
364 self.__mainLoopCond.acquire()
365 try:
366 log.debug( 'Monitoring loop lock acquired. Running loop' )
367
368 while not self.enabled:
369 self.__cleanUp()
370 if not self.alive:
371 return
372
373 self.__mainLoopCond.wait()
374
375 self.__monStep()
376
377
378 while self.__sleepCounter > 0.0:
379 self.progressCallback( self.__sleepCounter )
380 self.__mainLoopCond.wait( self.uPollRate )
381 if not self.enabled:
382 if not self.alive:
383 self.__cleanUp()
384
385 break
386 else:
387 self.__sleepCounter -= self.uPollRate
388
389 else:
390
391 if self.steps>0:
392
393 self.steps-=1
394
395 if self.steps<=0:
396 self.enabled=False
397
398 self.__monStepsTerminatedEvent.set()
399 finally:
400 self.__mainLoopCond.release()
401
402
403 self.__cleanUp()
404
406 """
407 A single monitoring step in the monitoring loop
408 Note:
409 Internally the step does not block, it produces *actions* that are queued to be run
410 in the thread pool
411 """
412 if not self.callbackHookDict:
413 log.error( 'No callback hooks registered' )
414 return
415 for cbHookFunc in self.callbackHookDict.keys():
416 try:
417 cbHookEntry = self.callbackHookDict[ cbHookFunc ]
418 except KeyError:
419 continue
420 if cbHookEntry.enabled and time.time()-cbHookEntry._lastRun>=cbHookEntry.timeout:
421 log.debug("Running monitoring callback hook function %s(**%s)" % (cbHookFunc, cbHookEntry.argDict))
422 cbHookFunc(**cbHookEntry.argDict)
423 cbHookEntry._lastRun = time.time()
424
425 self.runClientCallbacks()
426 self.__updateTimeStamp = time.time()
427 self.__sleepCounter = config[ 'base_poll_rate' ]
428
429
431 """
432 Enable/Run the monitoring loop and wait for the monitoring steps completion.
433 Parameters:
434 steps: number of monitoring steps to run
435 timeout: how long to wait for monitor steps termination (seconds)
436 jobs: a registry slice to be monitored (None -> all jobs), it may be passed by the user so ._impl is stripped if needed
437 Return:
438 False, if the loop cannot be started or the timeout occured while waiting for monitoring termination
439 True, if the monitoring steps were successfully executed
440 Note:
441 This method is meant to be used in Ganga scripts to request monitoring on demand.
442 """
443
444 if not type(steps) is int or steps<0:
445 log.warning("The number of monitor steps should be a positive integer")
446 return False
447
448 if not self.alive:
449 log.error("Cannot run the monitoring loop. It has already been stopped")
450 return False
451
452
453 if not Coordinator.servicesEnabled:
454 log.error("Cannot run the monitoring loop."
455 "The internal services are disabled (check your credentials or available disk space)")
456 return False
457
458
459 if not self.enabled:
460
461
462 _missingCreds = Coordinator.getMissingCredentials()
463 if _missingCreds:
464 log.error("Cannot run the monitoring loop. The following credentials are required: %s" % _missingCreds)
465 return False
466
467 self.__mainLoopCond.acquire()
468 log.debug( 'Monitoring loop lock acquired. Enabling mon loop' )
469 try:
470 if self.enabled or self.__isInProgress():
471 log.error("The monitoring loop is already running.")
472 return False
473
474 if jobs:
475 try:
476 m_jobs = jobs._impl
477 except AttributeError:
478 m_jobs = jobs
479
480
481
482 from Ganga.GPIDev.Lib.Registry.RegistrySlice import RegistrySlice
483 if not isinstance(m_jobs,RegistrySlice):
484 log.warning('runMonitoring: jobs argument must be a registry slice such as a result of jobs.select() or jobs[i1:i2]')
485 return False
486
487 self.registry = m_jobs
488
489
490 self.enabled = True
491
492 self.steps = steps
493
494 self.stopIter.clear()
495
496 self.setCallbackHook( updateDict_ts.timeoutCheck, {}, True )
497
498
499 self.__mainLoopCond.notifyAll()
500 finally:
501 self.__mainLoopCond.release()
502
503
504 self.__monStepsTerminatedEvent.wait()
505 self.__monStepsTerminatedEvent.clear()
506
507 if not self.__awaitTermination(timeout):
508 log.warning("Monitoring loop started but did not complete in the given timeout.")
509
510 self.stopIter.set()
511 return False
512 return True
513
515 """
516 Run the monitoring loop continuously
517 """
518
519 if not self.alive:
520 log.error("Cannot start monitoring loop. It has already been stopped")
521 return False
522
523 self.__mainLoopCond.acquire()
524 log.debug( 'Monitoring loop lock acquired. Enabling mon loop' )
525 try:
526 self.enabled = True
527
528 self.steps=-1
529
530 self.stopIter.clear()
531 log.debug( 'Monitoring loop enabled' )
532
533 self.setCallbackHook( updateDict_ts.timeoutCheck, {}, True )
534 self.__mainLoopCond.notifyAll()
535 finally:
536 self.__mainLoopCond.release()
537 return True
538
540 """
541 Disable the monitoring loop
542 """
543
544 if not self.alive:
545 log.error("Cannot disable monitoring loop. It has already been stopped")
546 return False
547
548 self.__mainLoopCond.acquire()
549 log.debug( 'Monitoring loop lock acquired. Disabling mon loop' )
550 try:
551 self.enabled = False
552 self.steps=-1
553 self.stopIter.set()
554 log.debug( 'Monitoring loop disabled' )
555
556 self.__mainLoopCond.notifyAll()
557 finally:
558 self.__mainLoopCond.release()
559
560 self.__cleanUpEvent.wait()
561 self.__cleanUpEvent.clear()
562 return True
563
564 - def stop( self, fail_cb=None, max_retries=5 ):
565 """
566 Stop the monitoring loop
567 Parameters:
568 fail_cb : if not None, this callback is called if a retry attempt is needed
569 """
570 if not self.alive:
571 log.warning("Monitoring loop has already been stopped")
572 return False
573
574 self.__mainLoopCond.acquire()
575 log.info( 'Stopping the monitoring component...' )
576 try:
577
578 self.alive = False
579 self.enabled = False
580 self.steps=-1
581 self.stopIter.set()
582
583 self.__mainLoopCond.notifyAll()
584 finally:
585 self.__mainLoopCond.release()
586
587 self.__cleanUpEvent.wait()
588 self.__cleanUpEvent.clear()
589
590
591
592
593
594
595
596 return True
597
614
617
619 """
620 Wait for resources to be cleaned up (threads,queue)
621 Returns:
622 False, on timeout
623 """
624 while self.__isInProgress():
625 time.sleep(self.uPollRate)
626 timeout-=self.uPollRate
627 if timeout<=0:
628 return False
629 return True
630
631
633 log.debug( 'Setting Callback hook function %s.' % func )
634 if func in self.callbackHookDict:
635 log.debug( 'Replacing existing callback hook function with %s' % func )
636 self.callbackHookDict[ func ] = CallbackHookEntry( argDict = argDict, enabled = enabled, timeout=timeout )
637
639 log.debug( 'Removing Callback hook function %s.' % func )
640 try:
641 del self.callbackHookDict[ func ]
642 except KeyError:
643 log.error( 'Callback hook function does not exist.' )
644
646 log.debug( 'Enabling Callback hook function %s.' % func )
647 try:
648 self.callbackHookDict[ func ].enabled = True
649 except KeyError:
650 log.error( 'Callback hook function does not exist.' )
651
653 log.debug( 'Disabling Callback hook function %s.' % func )
654 try:
655 self.callbackHookDict[ func ].enabled = False
656 except KeyError:
657 log.error( 'Callback hook function does not exist.' )
658
660 for clientFunc in self.clientCallbackDict:
661 log.debug( 'Running client callback hook function %s(**%s).' % ( clientFunc, self.clientCallbackDict[ clientFunc ] ) )
662 clientFunc( **self.clientCallbackDict[ clientFunc ] )
663
665 log.debug( 'Setting client callback hook function %s(**%s).' % ( clientFunc, argDict ) )
666 try:
667 self.clientCallbackDict[ clientFunc ] = argDict
668 except KeyError:
669 log.error( "Callback hook function not found." )
670
672 log.debug( 'Removing client callback hook function %s.' % clientFunc )
673 try:
674 del self.clientCallbackDict[ clientFunc ]
675 except KeyError:
676 log.error( "%s not found in client callback dictionary." % clientFunc.__name__ )
677
694
696 if makeActiveBackendsFunc is None:
697 makeActiveBackendsFunc = self.__defaultActiveBackendsFunc
698
699 def checkBackend( backendObj, jobListSet, lock ):
700 currentThread = threading.currentThread()
701 lock.acquire()
702 try:
703 log.debug( "[Update Thread %s] Lock acquired for %s" % ( currentThread, backendObj._name ) )
704 jobList_fromset = IList(filter( lambda x:x.status in [ 'submitted', 'running' ], jobListSet ),self.stopIter)
705 updateDict_ts.clearEntry( backendObj._name )
706 try:
707 log.debug( "[Update Thread %s] Updating %s with %s." % ( currentThread, backendObj._name, [x.id for x in jobList_fromset ] ) )
708 backendObj.master_updateMonitoringInformation( jobList_fromset )
709
710
711 for j in jobList_fromset:
712
713 if not j.do_auto_resubmit:
714 continue
715
716 if len(j.subjobs) == 0:
717 try_resubmit = j.info.submit_counter <= config['MaxNumResubmits']
718 else:
719
720 skip = False
721 for s in j.subjobs:
722 if s.info.submit_counter > config['MaxNumResubmits']:
723 skip = True
724
725 if skip:
726 continue
727
728 num_com = len( [s for s in j.subjobs if s.status in ['completed'] ] )
729 num_fail = len( [s for s in j.subjobs if s.status in ['failed'] ] )
730
731
732
733 try_resubmit = num_fail > 0 and (float(num_fail) / float(num_com+num_fail)) < config['MaxFracForResubmit']
734
735 if try_resubmit:
736 if j.backend.check_auto_resubmit():
737 log.warning('Auto-resubmit job %d...' % j.id)
738 j.auto_resubmit()
739
740 except BackendError, x:
741 self._handleError( x, x.backend_name, 0 )
742 except Exception, x:
743 self._handleError( x, backendObj._name, 1 )
744 log.debug( "[Update Thread %s] Flushing registry %s." % ( currentThread, [x.id for x in jobList_fromset ] ) )
745 self.registry._flush( jobList_fromset )
746 finally:
747 lock.release()
748 log.debug( "[Update Thread %s] Lock released for %s." % ( currentThread, backendObj._name ) )
749
750 def f( activeBackendsFunc ):
751 activeBackends = activeBackendsFunc()
752 for jList in activeBackends.values():
753 backendObj = jList[0].backend
754 try:
755 pRate = config[ backendObj._name ]
756 except:
757 pRate = config[ 'default_backend_poll_rate' ]
758
759
760
761
762
763 updateDict_ts.addEntry( backendObj, checkBackend, jList, pRate )
764
765 if makeActiveBackendsFunc == self.__defaultActiveBackendsFunc:
766 self.defaultUpdateJobStatus = f
767 if self.updateJobStatus is not None:
768 self.removeCallbackHook( self.updateJobStatus )
769 self.updateJobStatus = f
770 self.setCallbackHook( f, { 'activeBackendsFunc' : makeActiveBackendsFunc }, True )
771 return self.updateJobStatus
772
774 def credCheckJobInsertor():
775 def cb_Success():
776 self.enableCallbackHook( credCheckJobInsertor )
777
778 def cb_Failure():
779 self.enableCallbackHook( credCheckJobInsertor )
780 self._handleError( '%s checking failed!' % credObj._name, credObj._name, 1 )
781
782 log.debug( 'Inserting %s checking function to Qin.' % credObj._name )
783 _action = JobAction( function = self.makeCredChecker( credObj ),
784 callback_Success = cb_Success,
785 callback_Failure = cb_Failure )
786 self.disableCallbackHook( credCheckJobInsertor )
787 try:
788 Qin.put( _action )
789 except:
790 cb_Failure()
791 return credCheckJobInsertor
792
794 def credChecker():
795 log.debug( "Checking %s." % credObj._name )
796 try:
797 s = credObj.renew()
798 except Exception, msg:
799 return False
800 else:
801 return s
802 return credChecker
803
804
811
812 def cb_Failure():
813 self.disableCallbackHook(self.diskSpaceCheckJobInsertor)
814 self._handleError('Available disk space checking failed and it has been disabled!', 'DiskSpaceChecker',False)
815
816 log.debug('Inserting disk space checking function to Qin.')
817 _action = JobAction(function = Coordinator._diskSpaceChecker,
818 callback_Success = cb_Success,
819 callback_Failure = cb_Failure)
820 self.disableCallbackHook(self.diskSpaceCheckJobInsertor)
821 try:
822 Qin.put(_action)
823 except:
824 cb_Failure()
825
827 if time.time() - self.__updateTimeStamp >= self.minPollRate:
828 self.__sleepCounter = 0.0
829 else:
830 self.progressCallback( "Processing... Please wait." )
831 log.debug( "Updates too close together... skipping latest update request." )
832 self.__sleepCounter = self.minPollRate
833
835 def log_error():
836 log.error( 'Problem in the monitoring loop: %s', str( x ) )
837 if show_traceback:
838 Ganga.Utility.logging.log_user_exception( log )
839 bn = backend_name
840 self.errors.setdefault( bn, 0 )
841 if self.errors[ bn ] == 0:
842 log_error()
843 if not config[ 'repeat_messages' ]:
844 log.info( 'Further error messages from %s handler in the monitoring loop will be skipped.' % bn )
845 else:
846 if config[ 'repeat_messages' ]:
847 log_error()
848 self.errors[ bn ] += 1
849
850
851
852 -def _trace(frame, event, arg):
853 setattr(threading.currentThread(), '_frame', frame)
854
856 import inspect
857
858 try:
859 status = "Available threads:\n"
860
861 for worker in ThreadPool:
862 status = status + " " + worker.getName() + ":\n"
863
864 frame = worker._frame
865 if frame:
866 status = status + " stack:\n"
867 for frame, filename, line, function_name, context, index in inspect.getouterframes(frame):
868 status = status + " " + function_name + " @ " + filename + " # " + str(line) + "\n"
869
870 status = status + "\n"
871 print "Queue",Qin.queue
872 return status
873 finally:
874 pass
875