I have a class which subclasses threading.Thread
. It's sole responsibility is putting messages read from a UNIX named pipe to a queue.Queue
object (so that other threads can process these values later).
Example code:
class PipeReaderThread(Thread):
def __init__(self, results_queue, pipe_path):
Thread.__init__(self)
self._stop_event = Event()
self._results_queue = results_queue
self._pipe_path = pipe_path
def run(self):
while not self._stop_event.is_set():
with open(self._pipe_path, 'r') as pipe:
message = pipe.read()
self._results_queue.put(message, block=True)
def stop(self):
self._stop_event.set()
As you can see I wanted to use a threading.Event
object to stop the loop, but since the open()
or read()
calls on the named pipe will block (until someone opens the pipe for writing / writes to it then closes it), the thread never has the chance to stop.
I didn't want to use nonblocking mode for the named pipe, as the blocking is actually what I want in a sense that I want to wait for someone to open and write to the pipe.
With sockets I'd try something like setting a timeout flag on the socket, but I couldn't find any way of doing this for named pipes. I've also considered just killing the thread in cold blood without giving it a chance to stop gracefully, but this doesn't really feel like something I should be doing, and I don't even know if Python provides any way of doing this.
How should I stop this thread properly, so that I could call join()
on it afterwards?
Classic way to do this is to have unnamed pipe that signals closing and to use select
to know which one is to be used.
select
will block until one of descriptors is ready for read and then you can use os.read
which will not block in this case.
Code for demonstration (doesn't handle errors, might leak descriptors):
class PipeReaderThread(Thread):
def __init__(self, results_queue, pipe_path):
Thread.__init__(self)
self._stop_pipe_r, self._stop_pipe_w = os.pipe()
self._results_queue = results_queue
self._pipe = os.open(pipe_path, os.O_RDONLY) # use file descriptors directly to read file in parts
self._buffer = b''
def run(self):
while True:
result = select.select([self._stop_pipe_r, self._pipe], [], [])
if self._stop_pipe_r in result[0]:
os.close(self._stop_pipe_r)
os.close(self._stop_pipe_w)
os.close(self._pipe)
return
self._buffer += os.read(self._pipe, 4096) # select above guarantees read is noblocking
self._extract_messages_from_buffer() # left as an exercise
def stop(self):
os.write(self._stop_pipe_w, b'c')