Search code examples
python-3.xmultithreadingmultiprocessingpython-multiprocessingpython-multithreading

How to use Queue for multiprocessing with Python?


This program works fine, It should output: 0 1 2 3.

from multiprocessing import Process, Queue

NTHREADS = 4

def foo(queue, id):
    queue.put(id)

if __name__ == '__main__':
    queue = Queue()
    procs = []
    for id in range(NTHREADS):
        procs.append(Process(target=foo, args=(queue, id)))

    for proc in procs:
        proc.start()

    for proc in procs:
        proc.join()

    while not queue.empty():
        print(queue.get())

But not with this one. I think it stalls after join().

from multiprocessing import Process, Queue
from PIL import Image

NTHREADS = 4

def foo(queue):
    img = Image.new('RGB', (200,200), color=(255,0,0))
    queue.put(img)

if __name__ == '__main__':
    queue = Queue()
    procs = []
    for i in range(NTHREADS):
        procs.append(Process(target=foo, args=(queue,)))

    for proc in procs:
        proc.start()

    for proc in procs:
        proc.join()

    while not queue.empty():
        print(queue.get().size)

Why? How can I reach the end? How can I get my image? I'd like to work on 4 images in parallel and then merge them into one final image.


Solution

  • Queues are complicated beasts under the covers. When an (pickle of an) object is put on a queue, parts of it are fed into the underlying OS interprocess communication mechanism, but the rest is left in an in-memory Python buffer, to avoid overwhelming the OS facilities. The stuff in the memory buffer is fed into the OS mechanism as the receiving end makes room for more by taking stuff off the queue.

    A consequence is that a worker process cannot end before its memory buffers (feeding into queues) are empty.

    In your first program, pickles of integers are so tiny that memory buffers don't come into play. A worker feeds the entire pickle to the OS in one gulp, and the worker can exit then.

    But in your second program, the pickles are much larger. A worker sends part of the pickle to the OS, then waits for the main program to take it off the OS mechanism, so it can feed the next part of the pickle. Since your program never takes anything off the queue before calling .join(), the workers wait forever.

    So, in general, this is the rule: never attempt to .join() until all queues have been drained.

    Note this from the docs:

    Warning: As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.

    Also, queue.empty() is a poor way to test for this. That can only tell you if data is on the queue at the instant it happens to execute. In parallel processing, that's at best a probabilistic approximation to the truth. In your second example, you know exactly how many items you expect to get from the queue, so this way would be reliable:

    for proc in procs:
        proc.start()
    
    for i in range(NTHREADS):
        print(queue.get().size)
    
    for proc in procs: # join AFTER queue is drained
        proc.join()