I am trying to code a program (in Python) that isolates all points of a similar colour to a key from an image, and I've run into a problem when I tried to optimise it with multiprocessing. The processes don't all finish giving their results, with or without using .join(). The relevant (simplified) code is this:
def func(a,b,c,d,queue):
#*...some code that takes a long time (2+ seconds)...*
out = [a,b,c]
index = d
queue.put((out, index))
procs = []
q = multiprocessing.Queue()
for p in range(nP):
#*...some code that calculates the parameters...*
proc = multiprocessing.Process(target=func,args = (a,b,c,d,q))
proc.start()
procs.append(proc)
values = []
kInds = []
while not q.empty():
t = q.get()
values.append(t[0])
kInds.append(t[1])
print(t)
#this bit V
for pro in procs:
pro.join()
#*...rest of code...*
where the function being called queue.put()
s a tuple of (a list of indices matching the colour key, the number key being matched to).
As far as I can tell, without the marked bit of code to join processes, the main process carries on and so some processes don't return all their results, as expected. When I use the .join(), though, some of the processes still don't return all their values and the code then halts completely. If I put it before the q.get() clause, then the program just halts forever. Is this the right way to make the program wait until all results are put on and read off the queue? Is there a better way to do this?
Building robust python multiprocessing code has a number of pitfalls, the pre-built multiprocessing.Pool
is a great tool but sometimes you find yourself wanting something more/different.
With your code, you are running into a race condition where the queue is emptied by the main process before all child processes are done with their computation and have attempted to pass back their results. Furthermore the python documentation hints that the call to queue.empty()
and queue.qsize()
are not safe against race conditions in general. There are generally two ways (many more are possible) I commonly get around race conditions in a queue:
for _ in range(n_tasks): q.get()
n_workers
sentinels.class Sentinel: pass
n_sentinels = 0
while True:
item = q.get()
if isinstance(item, Sentinel):
n_sentinels += 1
if n_sentinels == n_workers:
break
else:
#process item
Another common problem while handling multiprocessing tasks with a queue can be deadlock. In particular, q.get
and q.put
may not return right away, and may be waiting on another process or thread to either put something in the queue or take something out. Python's multiprocessing queues are backed by OS pipes, which cannot store an infinite amount of data in them before q.put
will block. In your case the data is small, and it works fine to write all the data and wait for process completion before starting to read the data. This may stop working however when you move away from toy examples. In general, you should not attempt to join a process before reading all the data from the queue or else you may be waiting on the join in the main process while the child waits on q.put
and will never join until something empties the queue. Robust multiprocessing code must also consider that things sometimes go wrong, so I would say to not necessarily always use, but always consider if you should use a timeout value for any queue or lock operations. When a timeout occurs, you may have the opportunity to go check for example if a child process exited with a nonzero exitcode (exception).
q = Queue()
workers = []
for _ in range(n_workers):
p = Process(target=work_func, args=(a, b, q))
workers.append(p)
p.start()
while any(w.exitcode is None for w in workers): #while any worker is still running
try:
handle_results(q.get(True, 1)) #wait for at most 1 second before checking if any workers are still alive
except queue.Empty:
#perform other housekeeping while waiting
#for w in workers: w.join() #This is technically redundant to looping until all processes have an exitcode
while not q.empty(): #once we are single-threaded race conditions don't apply anymore, and this should be reliable.
handle_results(q.get()) #handle any remaining queue items.