Package Ganga :: Package Utility :: Package AMGAServerTools :: Module PipeReader
[hide private]
[frames] | no frames]

Source Code for Module Ganga.Utility.AMGAServerTools.PipeReader

  1  #!/usr/bin/env python2.2 
  2  #---------------------------------------------------------------------------- 
  3  # Name:         PipeReader.py 
  4  # Purpose:      Read from pipe in non blocking manner. 
  5  # 
  6  # Author:       Alexander Soroko 
  7  # 
  8  # Created:      20/10/2003      
  9  #---------------------------------------------------------------------------- 
 10   
 11  """Read from pipe in non blocking manner. It is portable to both 
 12  posix and windows environments.""" 
 13   
 14  import os 
 15  import time 
 16  import Queue 
 17  import threading 
 18   
 19  MIN_TIMEOUT = 0.01 
 20   
 21  ################################################################################ 
22 -class PipeReader:
23
24 - def __init__(self, readfile, timeout=None, pipesize=0, blocksize=1024):
25 """Initialise a non-blocking pipe object, given a real file readfile. 26 timeout = the default timeout (in seconds) at which read will decide 27 that there is no more data in the queue. 28 timeout = None, or < 0 stands for indefinite waiting time. 29 pipesize = the size (in blocks) of the queue used to buffer the 30 blocks read 31 blocksize = the maximum block size for a raw read.""" 32 33 self.rfile = readfile 34 # default timeout allowed between blocks 35 if timeout: 36 self.timeout = timeout 37 else: 38 self.timeout = -1 39 self.pipesize = pipesize 40 self.blocksize = blocksize 41 self._q = Queue.Queue(self.pipesize) 42 self._stop = threading.Event() 43 self._stop.clear() 44 self._thread = threading.Thread(target = self._readtoq) 45 self._thread.start()
46 47 #-------------------------------------------------------------------------------
48 - def __del__(self):
49 self.stop()
50 51 #-------------------------------------------------------------------------------
52 - def stop(self):
53 self._stop.set() 54 self._thread.join() 55 56 #-------------------------------------------------------------------------------
57 - def _readtoq(self):
58 try: 59 while 1: 60 item = self.rfile.read(self.blocksize) 61 if item == '': 62 break 63 else: 64 self._q.put(item) 65 if self._stop.isSet(): 66 break 67 except: 68 return
69 70 #-------------------------------------------------------------------------------
71 - def has_data(self):
72 return not self._q.empty()
73 74 #-------------------------------------------------------------------------------
75 - def isReading(self):
76 return self._thread.isAlive()
77 78 #-------------------------------------------------------------------------------
79 - def instantRead(self, maxblocks=0):
80 """Read data from the queue, to a maximum of maxblocks (0 = infinite). 81 Does not block.""" 82 data = '' 83 blockcount = 0 84 while self.has_data(): 85 data += self._q.get() 86 blockcount += 1 87 if blockcount == maxblocks: 88 break 89 return data
90 91 #-------------------------------------------------------------------------------
92 - def read(self, maxblocks=0, timeout=None, condition = None):
93 """Read data from the queue, allowing timeout seconds between block arrival. 94 if timeout = None, then use default timeout. If timeout<0, 95 then wait indefinitely. 96 Returns '' if we are at the EOF, or no data turns up within the timeout. 97 If condition is not None and condition() == False returns. 98 Reads at most maxblocks (0 = infinite). 99 Does not block.""" 100 def keepReading(endtime): 101 if endtime < 0: 102 return 1 103 else: 104 return time.time() < endtime
105 106 data = '' 107 blockcount = 0 108 if timeout == None: 109 timeout = self.timeout 110 111 if timeout < 0: 112 endtime = -1 113 else: 114 endtime = time.time() + timeout 115 116 while keepReading(endtime): 117 block = self.instantRead(1) 118 if block != '': 119 blockcount += 1 120 data += block 121 if blockcount == maxblocks: 122 break 123 if endtime != -1: 124 endtime = time.time() + timeout #reset endtime 125 continue 126 else: 127 time.sleep(MIN_TIMEOUT) 128 if self.isReading(): 129 if condition and condition() or not condition: 130 continue 131 else: 132 # process exited 133 # take a chance of reading data 134 time.sleep(MIN_TIMEOUT) 135 # stopping 136 if not self.has_data(): 137 break 138 139 return data 140