Search code examples
pythonpython-3.xmultiprocessingfork

Python multiprocessing.Queue not receiving puts from forked processes


I am creating a fixed number of forked child processes and attempting to have them return a result via multiprocessing.Queue. This is leading to some unexpected behaviour.

import multiprocessing
import os

def main():
    n_workers = 4

    q = multiprocessing.Queue(n_workers)

    for i in range(n_workers):
        if os.fork() == 0:
            print(f"child {i} put {i}")
            q.put(i)
            print(f"child {i} exiting")
            os._exit(0)

    for i in range(n_workers):
        res = q.get()
        print(f"parent got {res}")

    print("parent exiting")


if __name__ == "__main__":
    main()

When I run this, all of the children enqueue their results and terminate, but the parent process hangs:

child 0 put 0                                                                              │
child 1 put 1                                                                              │
child 2 put 2                                                                              │
child 3 put 3                                                                              │
child 0 exiting                                                                            │
child 1 exiting                                                                            │
child 2 exiting                                                                            │
child 3 exiting                                                                            │
parent got 0

Solution

  • The problem is the os._exit(0) call immediately after putting data into the queue.

    The multiprocessing docs explain how data is added to the queue:

    When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

    Because the process is forked, the call to os._exit(0) is necessary (as opposed to sys.exit(0)), but it does not do any cleanup. If the background thread has not yet flushed the data, it will be lost!

    The solution is to call close() followed by join_thread():

    import multiprocessing
    import os
    
    def main():
        n_workers = 4
    
        q = multiprocessing.Queue(n_workers)
    
        for i in range(n_workers):
            if os.fork() == 0:
                print(f"child {i} put {i}")
                q.put(i)
                print(f"child {i} exiting")
    
                q.close()  # indicate nothing else will be queued by this process
                q.join_thread()  # wait for the background thread to flush the data
    
                os._exit(0)
    
        for i in range(n_workers):
            res = q.get()
            print(f"parent got {res}")
    
        print("parent exiting")
    
    
    if __name__ == "__main__":
        main()