Search code examples
pythonmultiprocessingpython-multiprocessing

How to close a multiprocessing queue from a parent (consumer) process


Where using a multiprocessing queue to communicate between processes, many articles recommend sending a terminate message to the queue. However, if a child process is the producer, if may fail expectedly, leaving the consumer without and notification to expect more messages.

However, the parent process can be notified if a process when a child dies. It seems it should be possible for it to notify a worker thread in this process to quit and not expect more messages. But how?

    multiprocessing.Queue.close()

...doesn't notify consumers (Really? Wait? what!)

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")

... doesn't let me wait for pending work to complete.

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")
        # messageQ.close()
        messageQ.join_thread()  # Wait for worker to complete

... fails because the queue is not yet closed.

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE")
        messageQ.close()
        messageQ.join_thread()  # Wait for worker to complete

... seems like it should work, but fails in the worker with a TypeError exception:

    msg = messageQ.get()
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
    return self._recv(size)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
            while !quit:
                try:
                   msg = messageQ.get(block=True, timeout=0.5)
                except Empty:
                   continue

... is terrible in that it unnecessarily demands trading shutdown latency without throttling the CPU.

Full example

import multiprocessing
import threading


def producer(messageQ):
    messageQ.put("1")
    messageQ.put("2")
    messageQ.put("3")

if __name__ == '__main__':
    messageQ = multiprocessing.Queue()
    def worker():
        try:
            while True:
                msg = messageQ.get()
                print(msg)
                if msg=="TERMINATE": return
                # messageQ.task_done()
        finally:
            print("Worker quit")
            # messageQ.close()  # End thread
            # messageQ.join_thread()

    thr = threading.Thread(target=worker, 
                           daemon=False)   # The work queue is precious.
    thr.start()

    def onProcessQuit(): # Notify worker that we are done.
        messageQ.put("TERMINATE") # Notify worker we are done
        messageQ.close()        # No more messages
        messageQ.join_thread()  # Wait for worker to complete

    def runProcess():
        proc = multiprocessing.Process(target=producer, args=(messageQ,))
        proc.start()
        proc.join()
        print("runProcess quitting ...")
        onProcessQuit()
        print("runProcess quitting .. OK")

    runProcess()

Solution

  • If you are concerned about the producer process not completing normally, then I am not sure what your question is because your code as is should work except for a few corrections: (1) it is missing an import statement, (2) there is no call to runProcess and (3) your worker thread is incorrectly a daemon thread (as such it may end up terminating before it has had a chance to process all the messages on the queue).

    I would also use as a personal preference (and not a correction) None as the special sentinel message instead of TERMINATE and remove some extraneous queue calls that you don't really need (I don't see your explicitly closing the queue accomplishing anything that is necessary).

    These are the changes:

    def producer(messageQ):
        messageQ.put("1")
        messageQ.put("2")
        messageQ.put("3")
    
    if __name__ == '__main__':
        import multiprocessing
        import threading
    
        SENTINEL = None
    
        def worker():
            try:
                while True:
                    msg = messageQ.get()
                    if msg is SENTINEL:
                        return # No need to print the sentinel
                    print(msg)
            finally:
                print("Worker quit")
    
    
        def onProcessQuit(): # Notify worker that we are done.
            messageQ.put(SENTINEL) # Notify worker we are done
    
        def runProcess():
            proc = multiprocessing.Process(target=producer, args=(messageQ,))
            proc.start()
            proc.join()
            print("runProcess quitting ...")
            onProcessQuit()
            print("runProcess quitting .. OK")
            thr.join()
    
        messageQ = multiprocessing.Queue()
        thr = threading.Thread(target=worker)   # The work queue is precious.
        thr.start()
        runProcess()
    

    Prints:

    1
    2
    3
    runProcess quitting ...
    runProcess quitting .. OK
    Worker quit