Package Ganga :: Package Core :: Package GangaThread :: Module GangaThreadPool'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.GangaThread.GangaThreadPool'

  1  from Ganga.Utility.logging import getLogger 
  2  import time 
  3   
  4  logger = getLogger('GangaThread') 
  5   
6 -class GangaThreadPool:
7 8 _attributes = ('SHUTDOWN_TIMEOUT') 9 10 ## GangaThreadPool singleton instance 11 _instance = None 12
13 - class SingletonHelper:
14
15 - def __call__(self, *args, **kw):
16 17 if GangaThreadPool._instance is None: 18 object = GangaThreadPool() 19 GangaThreadPool._instance = object 20 21 return GangaThreadPool._instance
22 23 getInstance = SingletonHelper() 24
25 - def __init__(self):
26 27 if not GangaThreadPool._instance == None : 28 raise RuntimeError, 'Only one instance of GangaThreadPool is allowed!' 29 30 GangaThreadPool._instance=self 31 32 self.SHUTDOWN_TIMEOUT = 1 33 self.__threads = []
34
35 - def addServiceThread(self,t):
36 logger.debug('service thread "%s" added to the GangaThreadPool',t.getName()) 37 try: 38 self.__threads.append(t) 39 except DuplicateDataItemError, e: 40 self.logger.debug(str(e)) 41 pass
42
43 - def delServiceThread(self,t):
44 logger.debug('service thread "%s" deleted from the GangaThreadPool',t.getName()) 45 try: 46 self.__threads.remove(t) 47 except ValueError,e: 48 logger.debug(str(e)) 49 pass
50
51 - def shutdown(self, should_wait_cb = None):
52 """Shutdown the Ganga session. 53 54 @param should_wait_cb: A callback function with the following signature 55 should_wait_cb(total_time, critical_thread_ids, non_critical_thread_ids) 56 where 57 total_time is the time in seconds since shutdown started 58 critical_thread_ids is a list of alive critical thread names 59 non_critical_thread_ids is a list of alive non-critical threads names. 60 and 61 return value is evaluated as a boolean. 62 63 A shutdown thread is started that calls stop() on each GangaThread and 64 waits for them all to die. A loop waits for the shutdown thread to 65 die, periodically calling the should_wait_cb function to ask if it 66 should continue to wait or shutdown anyway. 67 68 """ 69 70 logger.debug('shutting down GangaThreadPool with timeout %d sec' % self.SHUTDOWN_TIMEOUT) 71 72 ## run shutdown thread in background 73 import threading 74 shutdown_thread = threading.Thread(target=self.__do_shutdown__, name='GANGA_Update_Thread_shutdown') 75 shutdown_thread.setDaemon(True) 76 shutdown_thread.start() 77 78 t_start = time.time() 79 80 ## wait for the background shutdown thread to finish 81 while shutdown_thread.isAlive(): 82 logger.debug('Waiting for max %d seconds for threads to finish'%self.SHUTDOWN_TIMEOUT) 83 logger.debug('There are %d alive background threads'%self.__cnt_alive_threads__()) 84 shutdown_thread.join(self.SHUTDOWN_TIMEOUT) 85 86 if shutdown_thread.isAlive(): 87 # if should_wait_cb callback exists then ask if we should wait 88 if should_wait_cb: 89 total_time = time.time()-t_start 90 critical_thread_ids = self.__alive_critical_thread_ids() 91 non_critical_thread_ids = self.__alive_non_critical_thread_ids() 92 if not should_wait_cb(total_time, critical_thread_ids, non_critical_thread_ids): 93 logger.debug('GangaThreadPool shutdown anyway after %d sec.' % (time.time()-t_start)) 94 break 95 else: 96 logger.debug('GangaThreadPool shutdown properly') 97 98 # log warning message if critical thread still alive 99 critical_thread_ids = self.__alive_critical_thread_ids() 100 if critical_thread_ids: 101 logger.warning('Shutdown forced. %d background thread(s) still running: %s', len(critical_thread_ids), critical_thread_ids) 102 103 # log debug message if critical thread still alive 104 non_critical_thread_ids = self.__alive_non_critical_thread_ids() 105 if non_critical_thread_ids: 106 logger.debug('Shutdown forced. %d non-critical background thread(s) still running: %s', len(non_critical_thread_ids), non_critical_thread_ids) 107 108 ## set singleton instance to None 109 self._instance = None 110 self.__threads = []
111
113 """Return a list of alive critical thread names.""" 114 return [t.getName() for t in self.__threads if t.isAlive() and t.isCritical()]
115
117 """Return a list of alive non-critical thread names.""" 118 return [t.getName() for t in self.__threads if t.isAlive() and not t.isCritical()]
119
120 - def __do_shutdown__(self):
121 122 logger.debug('Service threads to shutdown: %s' % list(self.__threads)) 123 124 ## shutdown each individual threads in the pool 125 for t in self.__threads: 126 logger.debug('shutting down Thread: %s' % t.getName()) 127 t.stop() 128 logger.debug('shutdown Thread: %s' % t.getName()) 129 130 ## counting the number of alive threads 131 num_alive_threads = self.__cnt_alive_threads__() 132 133 while num_alive_threads > 0: 134 # fix for bug #62543 https://savannah.cern.ch/bugs/?62543 135 # following 2 lines swapped so that we access no globals between sleep and exit test 136 logger.debug('number of alive threads: %d' % num_alive_threads) 137 time.sleep(0.3) 138 num_alive_threads = self.__cnt_alive_threads__()
139
140 - def __cnt_alive_threads__(self):
141 142 num_alive_threads = 0 143 for t in self.__threads: 144 if t.isAlive(): 145 num_alive_threads += 1 146 147 return num_alive_threads
148