Search code examples
pythonmultithreadingqueueproducer-consumer

Data type for a "closable" queue to handle a stream of items for multiple producers and consumers


Is there a specific type of Queue that is "closable", and is suitable for when there are multiple producers, consumers, and the data comes from a stream (so its not known when it will end)?

I've been unable to find a queue that implements this sort of behavior, or a name for one, but it seems like a integral type for producer-consumer type problems.

As an example, ideally I could write code where (1) each producer would tell the queue when it was done, (2) consumers would blindly call a blocking get(), and (3) when all consumers were done, and the queue was empty, all the producers would unblock and receive a "done" notification:

As code, it'd look something like this:

def produce():
  for x in range(randint()):
    queue.put(x)
    sleep(randint())
  queue.close()  # called once for every producer

def consume():
  while True:
    try:
      print queue.get()
    except ClosedQueue:
      print 'done!'
      break

num_producers = randint()
queue = QueueTypeThatICantFigureOutANameFor(num_producers)
[Thread(target=produce).start() for _ in range(num_producers)]
[Thread(target=consume).start() for _ in range(random())

Also, I'm not looking for the "Poison Pill" solution, where a "done" value is added to the queue for every consumer -- I don't like the inelegance of producers needing to know how many consumers there are.


Solution

  • I'd call that a self-latching queue.

    For your primary requirement, combine the queue with a condition variable check that gracefully latches (shuts down) the queue when all producers have vacated:

    class SelfLatchingQueue(LatchingQueue):
      ...
      def __init__(self, num_producers):
        ...
    
      def close(self):
        '''Called by a producer to indicate that it is done producing'''
    
        ... perhaps check that current thread is a known producer? ...
    
        with self.a_mutex:
          self._num_active_producers -= 1
          if self._num_active_producers <= 0:
            # Future put()s throw QueueLatched. get()s will empty the queue
            # and then throw QueueEmpty thereafter
            self.latch() # Guess what superclass implements this?
    

    For your secondary requirement (#3 in the original post, finished producers apparently block until all consumers are finished), I'd perhaps use a barrier or just another condition variable. This could be implemented in a subclass of the SelfLatchingQueue, of course, but without knowing the codebase I'd keep this behavior separate from the automatic latching.