I am trying to implement a multithreaded producer-consumer pattern using Queue.Queue in Python 2.7. I am trying to figure out how to make the consumers, i.e. the worker threads, stop once all required work is done.
See the second comment by Martin James to this answer: https://stackoverflow.com/a/19369877/1175080
Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.
But this does not work for me. See the following code for example.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
This program hangs after all three workers have read the exit indicator,
i.e. -1
from the queue, because each worker requeues -1
before
exiting, so the queue never becomes empty and q.join()
never returns.
I came up with the following but ugly solution where I send a -1
exit
indicator for each worker via the queue, so that each worker can see it
and commit suicide. But the fact that I have to send an exit indicator
for each worker feels a little ugly.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
I have two questions.
Don't call it a special case for a task.
Use an Event instead, with non-blocking implementation for your workers.
stopping = threading.Event()
def worker(n, q, timeout=1):
# run until the master thread indicates we're done
while not stopping.is_set():
try:
# don't block indefinitely so we can return to the top
# of the loop and check the stopping event
data = q.get(True, timeout)
# raised by q.get if we reach the timeout on an empty queue
except queue.Empty:
continue
q.task_done()
def master():
...
print 'waiting for workers to finish'
q.join()
stopping.set()
print 'done'