Search code examples
pythonpython-multiprocessing

Multiprocess.Process not exiting on empty Queue


I have a large number of objects to iterate over, and I thought that multiprocessing would speed up the work considerably. However, this simple example seems to hang once I increase the core count.

It hangs on the p.join() line, and if I terminate and check, q_in.empty() returns True and the output queue has the appropriate number of items.

What's causing it to hang?

from multiprocessing import Process, Queue
import time

def worker_func(q_in, q_out, w):
    time.sleep(1)
    while not q_in.empty():
        # Simple code standing in for more complex operation
         q_out.put(str(w) + '_' + str(q_in.get()))

def setup_func(x):
    q_in = Queue()
    for w in range(x):
        q_in.put(w)

    q_out = Queue()
    return((q_in, q_out))

def test_func(num_cores, q_in, q_out):
    processes = []

    for w in range(num_cores):
        p = Process(target=worker_func, args=(q_in, q_out, w))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    output_ls = []
    while not q_out.empty():
        output_ls.append(q_out.get())

    return(output_ls)

q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2

q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5

Solution

  • You have multiple processes pulling from the queue. The queue may have data but by the time you get around to fetching it, another process has consumed it. multiprocessing.Queue.empty says Because of multithreading/multiprocessing semantics, this is not reliable.

    An alternate approach is to put a process end sentinels at the end of the queue, one per process. When the process sees the sentinel, it exits. In your case, None is a good option.

    from multiprocessing import Process, Queue
    import time
    
    def worker_func(q_in, q_out, w):
        time.sleep(1)
        while True:
            msg = q_in.get()
            if msg is None:
                break
            q_out.put(str(w) + '_' + str(msg))
    
    def setup_func(x):
        q_in = Queue()
        for w in range(x):
            q_in.put(w)
    
        q_out = Queue()
        return((q_in, q_out))
    
    def test_func(num_cores, q_in, q_out):
        processes = []
    
        for w in range(num_cores):
            q_in.put(None)
            p = Process(target=worker_func, args=(q_in, q_out, w))
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        output_ls = []
        while not q_out.empty():
            output_ls.append(q_out.get())
    
        return(output_ls)
    
    q_in, q_out = setup_func(1000)
    test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
    
    q_in, q_out = setup_func(1000)
    test_func(5, q_in, q_out) # This hangs for num_cores = 5