Package Ganga :: Package Core :: Package GangaThread :: Package MTRunner :: Module Data'
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Core.GangaThread.MTRunner.Data'

 1  #!/usr/bin/env python 
 2  from Queue import Queue 
 3  import threading 
 4   
5 -class DuplicateDataItemError(Exception):
6 """ 7 Class raised when adding the same item in the Data object. 8 """ 9
10 - def __init__(self, message):
11 self.message = message
12 13
14 -class Data:
15 """ 16 Class to define user dataset collection. 17 """ 18 19 _attributes = ('collection','queue') 20
21 - def __init__(self, collection=[]):
22 23 self.collection = collection 24 self.queue = Queue(maxsize=-1) 25 self.lock = threading.Lock() 26 27 for item in collection: 28 self.queue.put(item)
29
30 - def getCollection(self):
31 return self.collection
32
33 - def isEmpty(self):
34 ''' 35 checks if the bounded queue is empty. 36 ''' 37 return self.queue.empty()
38
39 - def addItem(self, item):
40 ''' 41 try to put a new item in the queue. As the queue is defined with infinity number 42 of slots, it should never throw "Queue.Full" exception. 43 ''' 44 45 self.lock.acquire() 46 try: 47 if item not in self.collection: 48 self.collection.append(item) 49 50 #f = open('/tmp/hclee/mt_tasks.log', 'a') 51 #f.write( '%s \n' % str(item) ) 52 #f.close() 53 54 self.queue.put(item) 55 else: 56 raise DuplicateDataItemError('data item \'%s\' already in the task queue' % str(item)) 57 finally: 58 self.lock.release()
59
60 - def getNextItem(self):
61 ''' 62 try to get the next item in the queue after waiting in max. 1 sec. 63 64 if nothing available, the exception "Queue.Empty" will be thrown. 65 ''' 66 67 theItem = None 68 69 self.lock.acquire() 70 try: 71 theItem = self.queue.get(block=True, timeout=1) 72 finally: 73 self.lock.release() 74 75 return theItem
76