1 from Ganga.Utility.logging import getLogger
2 import time
3
4 logger = getLogger('GangaThread')
5
7
8 _attributes = ('SHUTDOWN_TIMEOUT')
9
10
11 _instance = None
12
22
23 getInstance = SingletonHelper()
24
34
42
44 logger.debug('service thread "%s" deleted from the GangaThreadPool',t.getName())
45 try:
46 self.__threads.remove(t)
47 except ValueError,e:
48 logger.debug(str(e))
49 pass
50
51 - def shutdown(self, should_wait_cb = None):
52 """Shutdown the Ganga session.
53
54 @param should_wait_cb: A callback function with the following signature
55 should_wait_cb(total_time, critical_thread_ids, non_critical_thread_ids)
56 where
57 total_time is the time in seconds since shutdown started
58 critical_thread_ids is a list of alive critical thread names
59 non_critical_thread_ids is a list of alive non-critical threads names.
60 and
61 return value is evaluated as a boolean.
62
63 A shutdown thread is started that calls stop() on each GangaThread and
64 waits for them all to die. A loop waits for the shutdown thread to
65 die, periodically calling the should_wait_cb function to ask if it
66 should continue to wait or shutdown anyway.
67
68 """
69
70 logger.debug('shutting down GangaThreadPool with timeout %d sec' % self.SHUTDOWN_TIMEOUT)
71
72
73 import threading
74 shutdown_thread = threading.Thread(target=self.__do_shutdown__, name='GANGA_Update_Thread_shutdown')
75 shutdown_thread.setDaemon(True)
76 shutdown_thread.start()
77
78 t_start = time.time()
79
80
81 while shutdown_thread.isAlive():
82 logger.debug('Waiting for max %d seconds for threads to finish'%self.SHUTDOWN_TIMEOUT)
83 logger.debug('There are %d alive background threads'%self.__cnt_alive_threads__())
84 shutdown_thread.join(self.SHUTDOWN_TIMEOUT)
85
86 if shutdown_thread.isAlive():
87
88 if should_wait_cb:
89 total_time = time.time()-t_start
90 critical_thread_ids = self.__alive_critical_thread_ids()
91 non_critical_thread_ids = self.__alive_non_critical_thread_ids()
92 if not should_wait_cb(total_time, critical_thread_ids, non_critical_thread_ids):
93 logger.debug('GangaThreadPool shutdown anyway after %d sec.' % (time.time()-t_start))
94 break
95 else:
96 logger.debug('GangaThreadPool shutdown properly')
97
98
99 critical_thread_ids = self.__alive_critical_thread_ids()
100 if critical_thread_ids:
101 logger.warning('Shutdown forced. %d background thread(s) still running: %s', len(critical_thread_ids), critical_thread_ids)
102
103
104 non_critical_thread_ids = self.__alive_non_critical_thread_ids()
105 if non_critical_thread_ids:
106 logger.debug('Shutdown forced. %d non-critical background thread(s) still running: %s', len(non_critical_thread_ids), non_critical_thread_ids)
107
108
109 self._instance = None
110 self.__threads = []
111
113 """Return a list of alive critical thread names."""
114 return [t.getName() for t in self.__threads if t.isAlive() and t.isCritical()]
115
117 """Return a list of alive non-critical thread names."""
118 return [t.getName() for t in self.__threads if t.isAlive() and not t.isCritical()]
119
139
141
142 num_alive_threads = 0
143 for t in self.__threads:
144 if t.isAlive():
145 num_alive_threads += 1
146
147 return num_alive_threads
148