I'm just starting out with multiprocessing in Python.
I use multiprocessing.queues
in my program that I pass from process to process.
The problem is that when I use myprocess.join()
, it only terminates when the queue I'm writing to is empty.
So I'm wondering what the reason is, why it's built that way and how to get around it. I can empty all my queues in my main process, but I'd like it better.
Bug part : What I explained to you is the bahaviour for queues with more than 100 elements.
In my example if you put a list of initial data of 100 : in range(100)
the p4.join()
will be called as expected, but if you put 1000 or more it stops working as I explained. So I'm a little bit lost.
EDIT : The bug appears at a input data size of 533 or 534 elements
Thanks,
Here a small example to show you.
import random
import multiprocessing
import time
def process1(data, queue1):
for item in data:
queue1.put(item)
queue1.put(None)
def process2(queue1, queue2):
while True:
item = queue1.get()
if item is None:
queue2.put(None)
break
queue2.put(item * 2)
def process3(queue2, queue3):
while True:
item = queue2.get()
if item is None:
queue3.put(None)
break
queue3.put(item * 2)
def process4(queue3, queue4):
while True:
item = queue3.get()
if item is None:
queue4.put(None)
print("None received")
break
queue4.put(item * 2)
print("End process4")
if __name__ == "__main__":
start_time = time.time()
data = [random.randint(1, 10000) for _ in range(10000)]
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
queue3 = multiprocessing.Queue()
queue4 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=process1, args=(data, queue1))
p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))
p1.start()
p2.start()
p3.start()
p4.start()
print("Proc started")
p1.join()
print("p1 join")
p2.join()
print("p2 join")
p3.join()
print("p3 join")
p4.join()
print("p4 join")
results = []
print("While loop")
while True:
item = queue4.get()
if item is None:
break
results.append(item)
end_time = time.time()
print(end_time - start_time)
The issue is described in https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming in section "Joining processes that use queues".
The basic idea is the process will wait before terminating until all the buffered items are fed by the "feeder" thread to the underlying pipe.
import multiprocessing
import random
import time
def process1(data, queue1):
for item in data:
queue1.put(item)
queue1.put(None)
def process2(queue1, queue2):
while True:
item = queue1.get()
if item is None:
queue2.put(None)
break
queue2.put(item * 2)
def process3(queue2, queue3):
while True:
item = queue2.get()
if item is None:
queue3.put(None)
break
queue3.put(item * 2)
def process4(queue3, queue4):
# calling queue4.cancel_join_thread() will correctly terminate the process
# queue4.cancel_join_thread()
# BUT not all items are put into the queue4 (rest are discarded)
# see https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# section "Joining processes that use queues"
while True:
item = queue3.get()
if item is None:
queue4.put(None)
print("None received")
break
queue4.put(item * 2)
print("End process4")
print(queue4.qsize())
if __name__ == "__main__":
start_time = time.time()
data = [random.randint(1, 10000) for _ in range(10000)]
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
queue3 = multiprocessing.Queue()
queue4 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=process1, args=(data, queue1))
p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))
p1.start()
p2.start()
p3.start()
p4.start()
print("Proc started")
p1.join()
print("p1 join")
p2.join()
print("p2 join")
p3.join()
print("p3 join")
# don't call p4.join() here
# p4.join()
# print("p4 join")
results = []
print("While loop")
while True:
item = queue4.get()
if item is None:
break
results.append(item)
end_time = time.time()
# call p4.join() here instead:
p4.join()
print("p4 join")
print(end_time - start_time)
Prints:
Proc started
p1 join
None received
End process4
10001
p2 join
p3 join
While loop
p4 join
0.1303730010986328