Search code examples
pythonmultithreadingpython-2.7concurrencyproducer-consumer

How to make worker threads quit after work is finished in a multithreaded producer-consumer pattern?


I am trying to implement a multithreaded producer-consumer pattern using Queue.Queue in Python 2.7. I am trying to figure out how to make the consumers, i.e. the worker threads, stop once all required work is done.

See the second comment by Martin James to this answer: https://stackoverflow.com/a/19369877/1175080

Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.

But this does not work for me. See the following code for example.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

This program hangs after all three workers have read the exit indicator, i.e. -1 from the queue, because each worker requeues -1 before exiting, so the queue never becomes empty and q.join() never returns.

I came up with the following but ugly solution where I send a -1 exit indicator for each worker via the queue, so that each worker can see it and commit suicide. But the fact that I have to send an exit indicator for each worker feels a little ugly.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

I have two questions.

  1. Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?
  2. If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?

Solution

  • Don't call it a special case for a task.

    Use an Event instead, with non-blocking implementation for your workers.

    stopping = threading.Event()
    
    def worker(n, q, timeout=1):
        # run until the master thread indicates we're done
        while not stopping.is_set():
            try:
                # don't block indefinitely so we can return to the top
                # of the loop and check the stopping event
                data = q.get(True, timeout)
            # raised by q.get if we reach the timeout on an empty queue
            except queue.Empty:
                continue
            q.task_done()
    
    def master():
        ...
    
        print 'waiting for workers to finish'
        q.join()
        stopping.set()
        print 'done'