Package Ganga :: Package Core :: Package GangaThread :: Package MTRunner :: Module MTRunner'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.GangaThread.MTRunner.MTRunner'

  1  #!/usr/bin/env python 
  2  import time 
  3  import traceback 
  4  from threading import Lock 
  5  from Queue import Empty 
  6  from Algorithm import AlgorithmError 
  7  from Ganga.Core.GangaThread.GangaThread import GangaThread 
  8  from Ganga.Core.GangaThread.MTRunner.Data import DuplicateDataItemError  
  9  from Ganga.Utility.logging import getLogger 
 10   
11 -class MTRunnerError(Exception):
12 """ 13 Class for general MTRunner errors. 14 """ 15
16 - def __init__(self, message):
17 self.message = message
18
19 -class GangaWorkAgent(GangaThread):
20
21 - def __init__(self, runnerObj, name):
22 GangaThread.__init__(self, name=name) 23 self._runner = runnerObj
24
25 - def run(self):
26 27 logger = getLogger('Ganga.Core.GangaThread.MTRunner') 28 29 while not self.should_stop(): 30 31 if self._runner.data.isEmpty(): 32 33 if self._runner.keepAlive: 34 #if self.debug: 35 # print 'data queue is empty, check again in 0.5 sec.' 36 time.sleep(0.5) 37 continue 38 else: 39 logger.debug('data queue is empty, stop worker') 40 break 41 else: 42 try: 43 item = self._runner.data.getNextItem() 44 45 ## write out the debug log 46 #self._runner.lock.acquire() 47 #f = open('/tmp/hclee/mt_debug.log','a') 48 #f.write( 'worker %s get item %s \n' % (self.getName(), item) ) 49 #f.close() 50 #self._runner.lock.release() 51 52 logger.debug( 'worker %s get item %s' % (self.getName(), item) ) 53 rslt = self._runner.algorithm.process(item) 54 if rslt: 55 self._runner.lock.acquire() 56 self._runner.doneList.append(item) 57 self._runner.lock.release() 58 except NotImplementedError: 59 break 60 except AlgorithmError: 61 break 62 except Empty: 63 pass 64 except: 65 traceback.print_exc() 66 pass 67 68 self.unregister()
69 70
71 -class MTRunner:
72 """ 73 Class to handle multiple concurrent threads running on the same algorithm. 74 75 @since: 0.0.1 76 @author: Hurng-Chun Lee 77 @contact: hurngchunlee@gmail.com 78 79 The class itself is a thread. To run it; doing the following: 80 81 runner = MTRunner(myAlgorithm, myData) 82 runner.start() 83 ... you can do something in parallel in your main program ... 84 runner.join() 85 86 where 'myAlorithm' and 'myData' are two objects defining your own 87 algorithm running on a dataset. 88 """ 89 90 _attributes = ('name', 'algorithm', 'data', 'numThread', 'doneList', 'lock', 'keepAlive') 91
92 - def __init__(self, name, algorithm=None, data=None, numThread=10, keepAlive=False):
93 """ 94 initializes the MTRunner object. 95 96 @since: 0.0.1 97 @author: Hurng-Chun Lee 98 @contact: hurngchunlee@gmail.com 99 100 @param algorithm is an Algorithm object defining how to process on the data 101 @param data is an Data object defining what to be processed by the algorithm 102 """ 103 104 if (not algorithm) or (not data): 105 raise MTRunnerError('algorithm and data must not be None') 106 107 self.algorithm = algorithm 108 self.data = data 109 self.numThread = numThread 110 self.doneList = [] 111 self.lock = Lock() 112 self.name = name 113 self.keepAlive = keepAlive 114 self._agents = [] 115 self.logger = getLogger('GangaThread')
116
117 - def getDoneList(self):
118 """ 119 gets the data items that have been processed correctly by the algorithm. 120 """ 121 return self.doneList
122
123 - def getResults(self):
124 """ 125 gets the overall results (e.g. output) from the algorithm. 126 """ 127 return self.algorithm.getResults()
128
129 - def addDataItem(self, item):
130 """ 131 adds a new data item into the internal queue 132 """ 133 try: 134 self.data.addItem(item) 135 except DuplicateDataItemError, e: 136 self.logger.debug('skip adding new item: %s' % e.message) 137 pass
138
139 - def start(self):
140 """ 141 starts the MTRunner 142 """ 143 144 for i in range(self.numThread): 145 t = GangaWorkAgent( runnerObj=self, name='%s_worker_agent_%d' % (self.name, i) ) 146 self._agents.append(t) 147 t.start()
148
149 - def join(self, timeout=-1):
150 """ 151 joins the worker agents. 152 153 The caller will be blocked until exceeding the timeout or all worker agents finish their jobs. 154 """ 155 156 ## check the number of alive threads 157 try: 158 t1 = time.time() 159 160 while self.__cnt_alive_threads__() > 0: 161 t2 = time.time() 162 163 ## break the loop if exceeding the timeout 164 if timeout >= 0 and t2-t1 > timeout: 165 break 166 else: 167 ## sleep for another 0.5 second 168 time.sleep(0.5) 169 170 except KeyboardInterrupt: 171 self.logger.error('Keyboard interruption on MTRunner: %s' % self.name)
172
173 - def stop(self, timeout=-1):
174 """ 175 waits worker agents to finish their works. 176 """ 177 178 ## ask all agents to stop 179 for agent in self._agents: 180 agent.stop() 181 182 self.join(timeout=timeout)
183 184
185 - def __cnt_alive_threads__(self):
186 187 num_alive_threads = 0 188 for t in self._agents: 189 if t.isAlive(): 190 num_alive_threads += 1 191 192 return num_alive_threads
193