Search code examples
pythonlockingpython-multiprocessingterminate

Unexpected behavior after process termination in python


I have a multprocessing program which is designed to terminate all other processes when the first process is done, using multiprocessing.Lock() for indicating the end of this process, and multiprocessing.Queue() for storing the return value.

However, sometimes the program is stuck in queue.get() method, as like the put() function is not done yet.

Here is a simplified version of the probelm, with one sub-process at all.


import multiprocessing

def foo(is_done, queue):
    queue.put(1)
    is_done.set()

    
if __name__ == "__main__":
    while True:
        is_done = multiprocessing.Event()
        queue = multiprocessing.Queue()
        p = multiprocessing.Process(target=foo,args=(is_done,queue))
        p.start()
        is_done.wait()
        p.terminate()
        print(queue.get())
        

As lock is set AFTER putting the value in queue, so I expected the process to be terminated after the value has been added. I have noticed that a little delay before termination solves the problem (maybe because put() is not termination in middle?) I am curious to know why is that.


Solution

  • You need to understand two key points.

    The first is that multiprocessing.Queue is implemented internally using a Pipe, which means it is a class for inter-process communication under the queue protocol. The put method is a "get in line" operation, so to speak, and the item is not actually "in the queue" and remains in the sender's process until inter-process communication occurs, i.e., until someone tries to get the item. This is why we cannot join subprocesses when the queue is not empty. (NOTE: The above description may not be correct when the item is small.)

    The second is that Process.terminate is a method for forcefully terminating a process, not for gracefully shutting it down.

    The documentation explicitly states:

    If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process.

    This is because items that are not yet "in the queue" may be lost, along with the process that is still holding them.

    Because of this, there is no guarantee that your code will work as expected. Even if it looks like it, it is merely coincidental.

    Here is how to fix it:

    import multiprocessing
    
    
    def foo(is_done, queue):
        queue.put(1)
        is_done.set()
    
    
    if __name__ == "__main__":
        while True:
            is_done = multiprocessing.Event()
            queue = multiprocessing.Queue()
            p = multiprocessing.Process(target=foo, args=(is_done, queue))
            p.start()
            is_done.wait()
            # p.terminate()  # <-- Do not terminate BEFORE the queue is empty.
            print(queue.get())
            p.join()  # You can safely join the process AFTER the queue is empty.
            p.close()
    

    There is one more point worth mentioning. The revised code still contains a potential bug.

    This behavior is quite confusing, but it is also possible for the put method to exit before "getting in line". i.e., the Event may be set at a moment when the queue is still empty.

    This becomes a problem especially when placing a large item and performing a non-blocking get.

    import multiprocessing
    
    def foo(is_done, queue):
        queue.put(list(range(1_000_000)))  # Something big item.
        is_done.set()
    
    if __name__ == "__main__":
        while True:
            is_done = multiprocessing.Event()
            queue = multiprocessing.Queue()
            p = multiprocessing.Process(target=foo, args=(is_done, queue))
            p.start()
            is_done.wait()
            _ = queue.get(block=False)  # Non-blocking get.
            print("Success!")
            p.join()
            p.close()
    

    When you run this, you will eventually observe an Empty exception. This is likely due to the slight delay in the "getting in line" operation because it is multithreaded.

    Note that since your code uses a blocking get, this should not be a problem for you, as it will wait until the item is in line.

    If the revised code still does not work as expected, consider using multiprocessing.Manager().Queue() instead. Manager.Queue has its own process that ensures that the item is "in the queue" as soon as it is put. This will make the behavior more predictable in many respects. However, it is noticeably slower, so which is better depends on your use case.