Search code examples
python-3.xmultithreadingpipenamed-pipesblocking

How to stop a thread which is blocking on a named pipe in Python?


I have a class which subclasses threading.Thread. It's sole responsibility is putting messages read from a UNIX named pipe to a queue.Queue object (so that other threads can process these values later).

Example code:

class PipeReaderThread(Thread):
    def __init__(self, results_queue, pipe_path):
        Thread.__init__(self)
        self._stop_event = Event()
        self._results_queue = results_queue
        self._pipe_path = pipe_path

    def run(self):
        while not self._stop_event.is_set():
            with open(self._pipe_path, 'r') as pipe:
                message = pipe.read()
            self._results_queue.put(message, block=True)

    def stop(self):
        self._stop_event.set()

As you can see I wanted to use a threading.Event object to stop the loop, but since the open() or read() calls on the named pipe will block (until someone opens the pipe for writing / writes to it then closes it), the thread never has the chance to stop.

I didn't want to use nonblocking mode for the named pipe, as the blocking is actually what I want in a sense that I want to wait for someone to open and write to the pipe.

With sockets I'd try something like setting a timeout flag on the socket, but I couldn't find any way of doing this for named pipes. I've also considered just killing the thread in cold blood without giving it a chance to stop gracefully, but this doesn't really feel like something I should be doing, and I don't even know if Python provides any way of doing this.

How should I stop this thread properly, so that I could call join() on it afterwards?


Solution

  • Classic way to do this is to have unnamed pipe that signals closing and to use select to know which one is to be used.

    select will block until one of descriptors is ready for read and then you can use os.read which will not block in this case.

    Code for demonstration (doesn't handle errors, might leak descriptors):

    class PipeReaderThread(Thread):
        def __init__(self, results_queue, pipe_path):
            Thread.__init__(self)
            self._stop_pipe_r, self._stop_pipe_w = os.pipe()
            self._results_queue = results_queue
            self._pipe = os.open(pipe_path, os.O_RDONLY) # use file descriptors directly to read file in parts
            self._buffer = b''
    
        def run(self):
            while True:
                result = select.select([self._stop_pipe_r, self._pipe], [], [])
                if self._stop_pipe_r in result[0]:
                    os.close(self._stop_pipe_r)
                    os.close(self._stop_pipe_w)
                    os.close(self._pipe)
                    return
                self._buffer += os.read(self._pipe, 4096) # select above guarantees read is noblocking
                self._extract_messages_from_buffer() # left as an exercise
    
        def stop(self):
            os.write(self._stop_pipe_w, b'c')