I have a large number of objects to iterate over, and I thought that multiprocessing would speed up the work considerably. However, this simple example seems to hang once I increase the core count.
It hangs on the p.join()
line, and if I terminate and check, q_in.empty()
returns True
and the output queue has the appropriate number of items.
What's causing it to hang?
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while not q_in.empty():
# Simple code standing in for more complex operation
q_out.put(str(w) + '_' + str(q_in.get()))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5
You have multiple processes pulling from the queue. The queue may have data but by the time you get around to fetching it, another process has consumed it. multiprocessing.Queue.empty says Because of multithreading/multiprocessing semantics, this is not reliable.
An alternate approach is to put a process end sentinels at the end of the queue, one per process. When the process sees the sentinel, it exits. In your case, None
is a good option.
from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while True:
msg = q_in.get()
if msg is None:
break
q_out.put(str(w) + '_' + str(msg))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
q_in.put(None)
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5