Here's an example. I have one producer and several consumers.
#!/usr/bin/env python2
from multiprocessing import Process, Queue
import time
def counter(low, high):
current = low
while current <= high:
yield current
current += 1
def put_tasks(q):
for c in counter(0, 9):
q.put(c)
time.sleep(.1)
print('put_tasks: no more tasks')
def work(id, q):
while True:
task = q.get()
print('process %d: %s' % (id, task))
time.sleep(.3)
print('process %d: done' % id)
if __name__ == '__main__':
q = Queue(2)
task_gen = Process(target=put_tasks, args=(q,))
processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
task_gen.start()
for p in processes:
p.start()
for p in processes:
p.join()
counter
is just a number generator for put_tasks
. Typically, I would have several thousands of tasks instead of just 10 like in this example. The point of this code is to feed the queue with tasks incrementally.
The problem is that consumers cannot know in advance how many tasks they will have to process but the put_tasks
function does know when it's done (it then prints no more tasks
).
Sample output:
process 2: 0
process 0: 1
process 1: 2
process 2: 3
process 0: 4
process 1: 5
process 2: 6
process 0: 7
process 1: 8
process 2: 9
put_tasks: no more tasks
All tasks get processed but the program then hangs (each process gets stuck on q.get()
. I would like it to terminate when all tasks have been processed without sacrificing speed or safety (no ugly timeouts).
Any ideas?
I suggest to put a sentinel value to put on the end of the queue
def put_tasks(q):
...
print('put_tasks: no more tasks')
q.put(end_of_queue)
def work(id, q):
while True:
task = q.get()
if task == end_of_queue:
q.put(task)
print("DONE")
return
print('process %d: %s' % (id, task))
time.sleep(.1)
print('process %d: done' % id)
class Sentinel:
def __init__(self, id):
self.id = id
def __eq__(self, other):
if isinstance(other, Sentinel):
return self.id == other.id
return NotImplemented
if __name__ == '__main__':
q = Queue(2)
end_of_queue = Sentinel("end of queue")
task_gen = Process(target=put_tasks, args=(q,))
processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
...
I don't seem to be able to use object()
as the sentinel because the threads seem to have access different instances, so they don't compare equal.
If you ever wish to generate random sentinels you can use the uuid
module to generate random ids:
import uuid
class Sentinel:
def __init__(self):
self.id = uuid.uuid4()
def __eq__(self, other):
if isinstance(other, Sentinel):
return self.id == other.id
return NotImplemented
Finally, zch used None
for a sentinel which is perfectly adequate as long as the queue cannot have None
in. The sentinel method will work for mostly-arbitrary arguments.