Search code examples
pythonmultithreadingdesign-patternsqueueproducer-consumer

How do i end this producer-consumer script?


I am trying to learn the Producer-Consumer pattern implementing it in python. I can make it work, but for some reason the consumers keep listening to something in the queue and doesn't end the script.

I know this is the expected behaviour, since the producer can keep adding things to the queue at a different rate the consumers consume. However, in my case, i already a list to be processed by the queue and i can guarantee that no other items will be added in the future.

Here is the full working code:

from threading import Thread
import time
import random
from queue import Queue

queue = Queue(10)

class ProducerThread(Thread):
    def __init__(self, nums):
        super().__init__()
        self.nums = nums

    def run(self):
        global queue
        while self.nums:
            num = self.nums.pop(0)
            queue.put(num)
            print("Produced", num)
            time.sleep(1)

class ConsumerThread(Thread):
    def __init__(self, id):
        super().__init__()
        self.id = id

    def run(self):
        global queue
        while True:
            num = queue.get()
            ##do something here
            queue.task_done()
            print(f"Consumed {num} in consumer {self.id}")
            time.sleep(1)
 


p = ProducerThread(list(range(5)))

l1 = ConsumerThread(1)
l2 = ConsumerThread(2)

p.start()
l1.start()
l2.start()

p.join()
l1.join()
l2.join()

Which condition can i replace in the consumer while True so it will understand that the script is over?

Thanks in advance.


Solution

  • My answer written out, as you requested.

    STOP_TOKEN = "STOP" # Anything that wouldn't normally be in your queue.
    
    class ProducerThread(Thread):
        ...
        def run(self):
            global queue
            while self.nums:
                num = self.nums.pop(0)
                queue.put(num)
                print("Produced", num)
                time.sleep(1)
            queue.put(STOP_TOKEN)
    
        
    
    class ConsumerThread(Thread):
        ...
        def run(self):
            global queue
            while True:
                num = queue.get()
                if num == STOP_TOKEN:
                    break
                ##do something here
                queue.task_done()
                print(f"Consumed {num} in consumer {self.id}")
                time.sleep(1)