This is my code:
from random import random
from multiprocessing import Process
from multiprocessing import Queue
import time
def new(shared_queue):
print('Consumer: Running', flush=True)
while shared_queue.qsize() > 0:
shared_queue.get()
print(shared_queue.qsize())
print('Consumer: Done', flush=True)
if __name__ == '__main__':
start_time = time.time()
# Init Queue
queue = Queue()
for _ in range(50000):
# generate a value
value = random()
# add to the queue
queue.put(value)
print('Init Queue size', queue.qsize())
p0 = Process(target=new, args=(queue,))
p0.start()
p1 = Process(target=new, args=(queue,))
p1.start()
p2 = Process(target=new, args=(queue,))
p2.start()
p0.join()
p1.join()
p2.join()
print("Done in --- %s seconds ---" % (time.time() - start_time))
When I run this, I get the result I expect:
Consumer: Done
Consumer: Done
Consumer: Done
Done in --- 5.304457664489746 seconds ---
However, when I only comment out the line
print(shared_queue.qsize())
I only get this result:
Init Queue size 50000
Consumer: Running
Consumer: Running
Consumer: Running
Consumer: Done
Why don't I see the other Consumer: Done
lines and the
Done in --- xxx seconds ---
?
Your problem is that get
is too fast and print
too slow. If the consumers get elements super fast may occur that one consumer, let's say p1
checks shared_queue.qsize() > 0
and by the time p1
arrives to shared_queue.get()
other consumer has called shared_queue.get()
for the last element.
If your example has not concurrent producers and consumers (if your queue is fully defined by the time the first consumer starts) instead of use timeout
as AhmedAEK suggested, you can use shared_queue.get(block=False)
, which will raise and exception that you will have to catch as AhmedAEK showed:
def new(shared_queue):
print('Consumer : Running', flush=True)
while shared_queue.qsize() > 0:
try:
shared_queue.get(block=False)
except queue.Empty:
pass
# print(shared_queue.qsize())
print('Consumer : Done', flush=True)