1
2
3
4
5
6
7
8
9
10
11 """Read from pipe in non blocking manner. It is portable to both
12 posix and windows environments."""
13
14 import os
15 import time
16 import Queue
17 import threading
18
19 MIN_TIMEOUT = 0.01
20
21
23
24 - def __init__(self, readfile, timeout=None, pipesize=0, blocksize=1024):
25 """Initialise a non-blocking pipe object, given a real file readfile.
26 timeout = the default timeout (in seconds) at which read will decide
27 that there is no more data in the queue.
28 timeout = None, or < 0 stands for indefinite waiting time.
29 pipesize = the size (in blocks) of the queue used to buffer the
30 blocks read
31 blocksize = the maximum block size for a raw read."""
32
33 self.rfile = readfile
34
35 if timeout:
36 self.timeout = timeout
37 else:
38 self.timeout = -1
39 self.pipesize = pipesize
40 self.blocksize = blocksize
41 self._q = Queue.Queue(self.pipesize)
42 self._stop = threading.Event()
43 self._stop.clear()
44 self._thread = threading.Thread(target = self._readtoq)
45 self._thread.start()
46
47
49 self.stop()
50
51
53 self._stop.set()
54 self._thread.join()
55
56
58 try:
59 while 1:
60 item = self.rfile.read(self.blocksize)
61 if item == '':
62 break
63 else:
64 self._q.put(item)
65 if self._stop.isSet():
66 break
67 except:
68 return
69
70
72 return not self._q.empty()
73
74
76 return self._thread.isAlive()
77
78
80 """Read data from the queue, to a maximum of maxblocks (0 = infinite).
81 Does not block."""
82 data = ''
83 blockcount = 0
84 while self.has_data():
85 data += self._q.get()
86 blockcount += 1
87 if blockcount == maxblocks:
88 break
89 return data
90
91
92 - def read(self, maxblocks=0, timeout=None, condition = None):
93 """Read data from the queue, allowing timeout seconds between block arrival.
94 if timeout = None, then use default timeout. If timeout<0,
95 then wait indefinitely.
96 Returns '' if we are at the EOF, or no data turns up within the timeout.
97 If condition is not None and condition() == False returns.
98 Reads at most maxblocks (0 = infinite).
99 Does not block."""
100 def keepReading(endtime):
101 if endtime < 0:
102 return 1
103 else:
104 return time.time() < endtime
105
106 data = ''
107 blockcount = 0
108 if timeout == None:
109 timeout = self.timeout
110
111 if timeout < 0:
112 endtime = -1
113 else:
114 endtime = time.time() + timeout
115
116 while keepReading(endtime):
117 block = self.instantRead(1)
118 if block != '':
119 blockcount += 1
120 data += block
121 if blockcount == maxblocks:
122 break
123 if endtime != -1:
124 endtime = time.time() + timeout
125 continue
126 else:
127 time.sleep(MIN_TIMEOUT)
128 if self.isReading():
129 if condition and condition() or not condition:
130 continue
131 else:
132
133
134 time.sleep(MIN_TIMEOUT)
135
136 if not self.has_data():
137 break
138
139 return data
140