1
2 import time
3 import traceback
4 from threading import Lock
5 from Queue import Empty
6 from Algorithm import AlgorithmError
7 from Ganga.Core.GangaThread.GangaThread import GangaThread
8 from Ganga.Core.GangaThread.MTRunner.Data import DuplicateDataItemError
9 from Ganga.Utility.logging import getLogger
10
12 """
13 Class for general MTRunner errors.
14 """
15
17 self.message = message
18
20
24
26
27 logger = getLogger('Ganga.Core.GangaThread.MTRunner')
28
29 while not self.should_stop():
30
31 if self._runner.data.isEmpty():
32
33 if self._runner.keepAlive:
34
35
36 time.sleep(0.5)
37 continue
38 else:
39 logger.debug('data queue is empty, stop worker')
40 break
41 else:
42 try:
43 item = self._runner.data.getNextItem()
44
45
46
47
48
49
50
51
52 logger.debug( 'worker %s get item %s' % (self.getName(), item) )
53 rslt = self._runner.algorithm.process(item)
54 if rslt:
55 self._runner.lock.acquire()
56 self._runner.doneList.append(item)
57 self._runner.lock.release()
58 except NotImplementedError:
59 break
60 except AlgorithmError:
61 break
62 except Empty:
63 pass
64 except:
65 traceback.print_exc()
66 pass
67
68 self.unregister()
69
70
72 """
73 Class to handle multiple concurrent threads running on the same algorithm.
74
75 @since: 0.0.1
76 @author: Hurng-Chun Lee
77 @contact: hurngchunlee@gmail.com
78
79 The class itself is a thread. To run it; doing the following:
80
81 runner = MTRunner(myAlgorithm, myData)
82 runner.start()
83 ... you can do something in parallel in your main program ...
84 runner.join()
85
86 where 'myAlorithm' and 'myData' are two objects defining your own
87 algorithm running on a dataset.
88 """
89
90 _attributes = ('name', 'algorithm', 'data', 'numThread', 'doneList', 'lock', 'keepAlive')
91
92 - def __init__(self, name, algorithm=None, data=None, numThread=10, keepAlive=False):
93 """
94 initializes the MTRunner object.
95
96 @since: 0.0.1
97 @author: Hurng-Chun Lee
98 @contact: hurngchunlee@gmail.com
99
100 @param algorithm is an Algorithm object defining how to process on the data
101 @param data is an Data object defining what to be processed by the algorithm
102 """
103
104 if (not algorithm) or (not data):
105 raise MTRunnerError('algorithm and data must not be None')
106
107 self.algorithm = algorithm
108 self.data = data
109 self.numThread = numThread
110 self.doneList = []
111 self.lock = Lock()
112 self.name = name
113 self.keepAlive = keepAlive
114 self._agents = []
115 self.logger = getLogger('GangaThread')
116
118 """
119 gets the data items that have been processed correctly by the algorithm.
120 """
121 return self.doneList
122
124 """
125 gets the overall results (e.g. output) from the algorithm.
126 """
127 return self.algorithm.getResults()
128
130 """
131 adds a new data item into the internal queue
132 """
133 try:
134 self.data.addItem(item)
135 except DuplicateDataItemError, e:
136 self.logger.debug('skip adding new item: %s' % e.message)
137 pass
138
140 """
141 starts the MTRunner
142 """
143
144 for i in range(self.numThread):
145 t = GangaWorkAgent( runnerObj=self, name='%s_worker_agent_%d' % (self.name, i) )
146 self._agents.append(t)
147 t.start()
148
149 - def join(self, timeout=-1):
150 """
151 joins the worker agents.
152
153 The caller will be blocked until exceeding the timeout or all worker agents finish their jobs.
154 """
155
156
157 try:
158 t1 = time.time()
159
160 while self.__cnt_alive_threads__() > 0:
161 t2 = time.time()
162
163
164 if timeout >= 0 and t2-t1 > timeout:
165 break
166 else:
167
168 time.sleep(0.5)
169
170 except KeyboardInterrupt:
171 self.logger.error('Keyboard interruption on MTRunner: %s' % self.name)
172
173 - def stop(self, timeout=-1):
174 """
175 waits worker agents to finish their works.
176 """
177
178
179 for agent in self._agents:
180 agent.stop()
181
182 self.join(timeout=timeout)
183
184
186
187 num_alive_threads = 0
188 for t in self._agents:
189 if t.isAlive():
190 num_alive_threads += 1
191
192 return num_alive_threads
193