Search code examples
pythonmultiprocessingproducer-consumerpool

Multiprocesing pool.join() hangs under some circumstances


I am trying to create a simple producer / consumer pattern in Python using multiprocessing. It works, but it hangs on poll.join().

from multiprocessing import Pool, Queue

que = Queue()


def consume():
    while True:
        element = que.get()
        if element is None:
            print('break')
            break
    print('Consumer closing')


def produce(nr):
    que.put([nr] * 1000000)
    print('Producer {} closing'.format(nr))


def main():
    p = Pool(5)
    p.apply_async(consume)
    p.map(produce, range(5))
    que.put(None)
    print('None')
    p.close()
    p.join()


if __name__ == '__main__':
    main()

Sample output:

~/Python/Examples $ ./multip_prod_cons.py 
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing

However, it works perfectly when I change one line:

que.put([nr] * 100)

It is 100% reproducible on Linux system running Python 3.4.3 or Python 2.7.10. Am I missing something?


Solution

  • There is quite a lot of confusion here. What you are writing is not a producer/consumer scenario but a mess which is misusing another pattern usually referred as "pool of workers".

    The pool of workers pattern is an application of the producer/consumer one in which there is one producer which schedules the work and many consumers which consume it. In this pattern, the owner of the Pool ends up been the producer while the workers will be the consumers.

    In your example instead you have a hybrid solution where one worker ends up being a consumer and the others act as sort of middle-ware. The whole design is very inefficient, duplicates most of the logic already provided by the Pool and, more important, is very error prone. What you end up suffering from, is a Deadlock.

    Putting an object into a multiprocessing.Queue is an asynchronous operation. It blocks only if the Queue is full and your Queue has infinite size.

    This means your produce function returns immediately therefore the call to p.map is not blocking as you expect it to do. The related worker processes instead, wait until the actual message goes through the Pipe which the Queue uses as communication channel.

    What happens next is that you terminate prematurely your consumer as you put in the Queue the None "message" which gets delivered before all the lists your produce function create are properly pushed through the Pipe.

    You notice the issue once you call p.join but the real situation is the following.

    • the p.join call is waiting for all the worker processes to terminate.
    • the worker processes are waiting for the big lists to go though the Queue's Pipe.
    • as the consumer worker is long gone, nobody drains the Pipe which is obviously full.

    The issue does not show if your lists are small enough to go through before you actually send the termination message to the consume function.