I am trying to create a simple producer / consumer pattern in Python using multiprocessing
. It works, but it hangs on poll.join()
.
from multiprocessing import Pool, Queue
que = Queue()
def consume():
while True:
element = que.get()
if element is None:
print('break')
break
print('Consumer closing')
def produce(nr):
que.put([nr] * 1000000)
print('Producer {} closing'.format(nr))
def main():
p = Pool(5)
p.apply_async(consume)
p.map(produce, range(5))
que.put(None)
print('None')
p.close()
p.join()
if __name__ == '__main__':
main()
Sample output:
~/Python/Examples $ ./multip_prod_cons.py
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing
However, it works perfectly when I change one line:
que.put([nr] * 100)
It is 100% reproducible on Linux system running Python 3.4.3 or Python 2.7.10. Am I missing something?
There is quite a lot of confusion here. What you are writing is not a producer/consumer scenario but a mess which is misusing another pattern usually referred as "pool of workers".
The pool of workers pattern is an application of the producer/consumer one in which there is one producer which schedules the work and many consumers which consume it. In this pattern, the owner of the Pool
ends up been the producer while the workers will be the consumers.
In your example instead you have a hybrid solution where one worker ends up being a consumer and the others act as sort of middle-ware. The whole design is very inefficient, duplicates most of the logic already provided by the Pool
and, more important, is very error prone. What you end up suffering from, is a Deadlock.
Putting an object into a multiprocessing.Queue
is an asynchronous operation. It blocks only if the Queue
is full and your Queue
has infinite size.
This means your produce
function returns immediately therefore the call to p.map
is not blocking as you expect it to do. The related worker processes instead, wait until the actual message goes through the Pipe
which the Queue
uses as communication channel.
What happens next is that you terminate prematurely your consumer as you put in the Queue
the None
"message" which gets delivered before all the lists your produce
function create are properly pushed through the Pipe
.
You notice the issue once you call p.join
but the real situation is the following.
p.join
call is waiting for all the worker processes to terminate.Queue
's Pipe
.Pipe
which is obviously full.The issue does not show if your lists are small enough to go through before you actually send the termination message to the consume
function.