Search code examples
pythonmultiprocessing

Multiprocessing code behaves differently when commenting out one print statement


This is my code:

from random import random
from multiprocessing import Process
from multiprocessing import Queue
import time


def new(shared_queue):
    print('Consumer: Running', flush=True)
    while shared_queue.qsize() > 0:
        shared_queue.get()
        print(shared_queue.qsize())
    print('Consumer: Done', flush=True)


if __name__ == '__main__':
    start_time = time.time()

    # Init Queue
    queue = Queue()
    for _ in range(50000):
        # generate a value
        value = random()
        # add to the queue
        queue.put(value)

    print('Init Queue size', queue.qsize())

    p0 = Process(target=new, args=(queue,))
    p0.start()
    p1 = Process(target=new, args=(queue,))
    p1.start()
    p2 = Process(target=new, args=(queue,))
    p2.start()

    p0.join()
    p1.join()
    p2.join()

    print("Done in --- %s seconds ---" % (time.time() - start_time))

When I run this, I get the result I expect:

Consumer: Done
Consumer: Done
Consumer: Done
Done in --- 5.304457664489746 seconds ---

However, when I only comment out the line

print(shared_queue.qsize())

I only get this result:

Init Queue size 50000
Consumer: Running
Consumer: Running
Consumer: Running
Consumer: Done

Why don't I see the other Consumer: Done lines and the Done in --- xxx seconds ---?


Solution

  • Your problem is that get is too fast and print too slow. If the consumers get elements super fast may occur that one consumer, let's say p1 checks shared_queue.qsize() > 0 and by the time p1 arrives to shared_queue.get() other consumer has called shared_queue.get() for the last element.

    If your example has not concurrent producers and consumers (if your queue is fully defined by the time the first consumer starts) instead of use timeout as AhmedAEK suggested, you can use shared_queue.get(block=False), which will raise and exception that you will have to catch as AhmedAEK showed:

    def new(shared_queue):
        print('Consumer : Running', flush=True)
        while shared_queue.qsize() > 0:
            try:
                shared_queue.get(block=False)
            except queue.Empty:
                pass
            # print(shared_queue.qsize())
        print('Consumer : Done', flush=True)