I have some code that farms out work to tasks. The tasks put their results on a queue, and the main thread reads these results from the queue and deals with them.
from multiprocessing import Process, Queue, Pool, Manager
import uuid
def handle_task(arg, queue, end_marker):
... add some number of results to the queue . . .
queue.put(end_marker)
def main(tasks):
manager = Manager()
queue = manager.Queue()
count = len(tasks)
end_marker = uuid.uuid4()
with Pool() as pool:
pool.starmap(handle_task, ((task, queue, end_marker) for task in tasks))
while count > 0:
value = queue.get()
if value == end_marker:
count -= 1
else:
... deal with value ...
This code works, but it is incredibly kludgy and inelegant. What if tasks
is a iterator? Why do I need to know how many tasks there are ahead of time and keep track of each of them.
Is there a cleaner way of reading from a Queue and and knowing that every process that will write to that thread is done, and you've read everything that they've written?
First of all, operations on a managed queue are very slow compared to a multiprocessing.Queue
instance. But why are you even using an an additional queue to return results when a multiprocessing pool already uses such a queue for returning results? Instead of having handle_task
write some number of result values to a queue, it could simply return a list of these values. For example,
from multiprocessing import Pool
def handle_task(arg):
results = []
# Add some number of results to the results list:
results.append(arg + arg)
results.append(arg * arg)
return results
def main(tasks):
with Pool() as pool:
map_results = pool.map(handle_task, tasks)
for results in map_results:
for value in results:
# Deal with value:
print(value)
if __name__ == '__main__':
main([7, 2, 3])
Prints:
14
49
4
4
6
9
As a side benefit, the results returned will be in task-submission order, which one day might be important. If you want to be able to process the returned values as they become available, then you can use pool.imap
or pool.imap_unordered
(if you don't care about the order of the returned values, which seems to be the case):
from multiprocessing import Pool
def handle_task(arg):
results = []
# Add some number of results to the results list:
results.append(arg + arg)
results.append(arg * arg)
return results
def main(tasks):
with Pool() as pool:
for results in pool.imap_unordered(handle_task, tasks):
for value in results:
# Deal with value:
print(value)
if __name__ == '__main__':
main([7, 2, 3])
If the number of tasks being submitted is "large", then you should probably use the chunksize argument of the imap_unordered
method. A reasonable value would be len(tasks) / (4 * pool_size)
where you are using by default a value of multiprocessing.cpu_count()
for your pool size. This is more or less how a chunksize value is computed when you use the map
or starmap
methods and you have not specified the chunksize argument.
Using a multiprocessing.Queue
instance
from multiprocessing import Pool, Queue
from queue import Empty
def init_pool_processes(q):
global queue
queue = q
def handle_task(arg):
results = []
# Add some number of results to the results list:
queue.put(arg + arg) # Referencing the global queue
queue.put(arg * arg)
def main(tasks):
queue = Queue()
with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
pool.map(handle_task, tasks)
try:
while True:
value = queue.get_nowait()
print(value)
except Empty:
pass
if __name__ == '__main__':
main([7, 2, 3])
Although callling queue.empty()
is not supposed to be reliable for a multiprocessing.Queue
instance, as long as you are doing this after all the tasks have finished processing, it seems no more unreliable than relying on blocking get calls raising an exception only after all items have been retrieved:
from multiprocessing import Pool, Queue
def init_pool_processes(q):
global queue
queue = q
def handle_task(arg):
results = []
# Add some number of results to the results list:
queue.put(arg + arg) # Referencing the global queue
queue.put(arg * arg)
def main(tasks):
queue = Queue()
with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
pool.map(handle_task, tasks)
while not queue.empty():
value = queue.get_nowait()
print(value)
if __name__ == '__main__':
main([7, 2, 3])
But if you want to do everything strictly according to what the documentation implies is the only reliable method when using a multiprocessing.Queue
instance, that would be by using sentinels as you already are doing:
from multiprocessing import Pool, Queue
class Sentinel:
pass
SENTINEL = Sentinel()
def init_pool_processes(q):
global queue
queue = q
def handle_task(arg):
results = []
# Add some number of results to the results list:
queue.put(arg + arg) # Referencing the global queue
queue.put(arg * arg)
queue.put(SENTINEL)
def main(tasks):
queue = Queue()
with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
pool.map_async(handle_task, tasks) # Does not block
sentinel_count = len(tasks)
while sentinel_count != 0:
value = queue.get()
if isinstance(value, Sentinel):
sentinel_count -= 1
else:
print(value)
if __name__ == '__main__':
main([7, 2, 3])
Conclusion
If you need to use a queue for output, I would recommend a multiprocessing.Queue
. In this case using sentinels is really the only 100% correct way of proceeding. I would also use the map_async
method so that you can start processing results as they are returned.
Using a Managed Queue
This is Pingu's answer, which remains deleted for now:
from multiprocessing import Pool, Manager
from random import randint
def process(n, q):
for x in range(randint(1, 10)):
q.put((n, x))
def main():
with Manager() as manager:
queue = manager.Queue()
with Pool() as pool:
pool.starmap(process, [(n, queue) for n in range(5)])
while not queue.empty():
print(queue.get())
if __name__ == '__main__':
main()