Search code examples
pythonpython-3.xmultithreadingqueueproducer-consumer

Why doesn't the producer-consumer stop?


I've found an example representing producer-consumer with two threads. But, when I send a signal to the process to stop, it doesn't. It expects second signal e.g. SIGKILL to completely stop. I thought the problem is with task_done() but it seems not.

import time

import queue
import threading
import random


class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        """
        Thread run method. Append random integers to the integers
        list at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.queue.put(integer)
            print('%d put to queue by %s' % (integer, self.name))
            time.sleep(1)


class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            integer = self.queue.get()
            print('%d popped from list by %s' % (integer, self.name))
            self.queue.task_done()


def main():
    q = queue.Queue()
    t1 = Producer(q)
    t2 = Consumer(q)
    t1.start()
    t2.start()
    t1.join()
    t2.join()


if __name__ == '__main__':
    main()

Output:

210 put to queue by Thread-1
210 popped from list by Thread-2
Traceback (most recent call last):
  File "/Users/abc/PycharmProjects/untitled1/ssid.py", line 74, in <module>
    main()
  File "/Users/abc/PycharmProjects/untitled1/ssid.py", line 69, in main
    t1.join()
  File "/usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
244 put to queue by Thread-1
244 popped from list by Thread-2
85 put to queue by Thread-1
85 popped from list by Thread-2
160 put to queue by Thread-1
160 popped from list by Thread-2

Solution

  • It's because only the main-thread get's stopped by the KeyboardInterrupt. You can watch this by letting your child threads print threading.enumerate() which returns all alive threads + the main thread.

    import time
    import queue
    import threading
    import random
    
    
    class Producer(threading.Thread):
    
        def __init__(self, queue):
            super().__init__()
            self.queue = queue
    
        def run(self):
            while True:
                integer = random.randint(0, 256)
                self.queue.put(integer)
                print(f'{integer} put to queue by {self.name} '
                      f'threads: {threading.enumerate()}')
                time.sleep(1)
    
    
    class Consumer(threading.Thread):
    
        def __init__(self, queue):
            super().__init__()
            self.queue = queue
    
        def run(self):
            while True:
                integer = self.queue.get()
                print(f'{integer} popped from list by {self.name} '
                      f'threads:{threading.enumerate()}')
                self.queue.task_done()
    
    
    def main():
        q = queue.Queue()
        t1 = Producer(q)
        t2 = Consumer(q)
        # t1.daemon = True
        # t2.daemon = True
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
    
    if __name__ == '__main__':
        try:
            main()
        except KeyboardInterrupt:
            print('got KeyboardInterrupt')
    

    Example Output with KeyboardInterrupt. Note the MainThread listed as 'stopped' after the KeyboardInterrupt:

    97 put to queue by Thread-1 threads: [<_MainThread(MainThread, started 
    139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
    <Consumer(Thread-2, started 139810242520832)>]
    97 popped from list by Thread-2 threads:[<_MainThread(MainThread, started 
    139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
    <Consumer(Thread-2, started 139810242520832)>]
    got KeyboardInterrupt
    92 put to queue by Thread-1 threads: [<_MainThread(MainThread, stopped 
    139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
    <Consumer(Thread-2, started 139810242520832)>]
    92 popped from list by Thread-2 threads:[<_MainThread(MainThread, stopped 
    139810293606208)>, <Producer(Thread-1, started 139810250913536)>, 
    <Consumer(Thread-2, started 139810242520832)>]
    

    You could make the child-threads daemons to let them exit with the main-thread. But that should be only considered in case your threads don't hold any resources:

    Note Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event docs.

    The better way would be to catch the KeyboardInterrupt like in the code above and send a sentinel value over the queue to the child-threads to let them know they should finish, allowing them to do clean-up before exit.