Search code examples
pythonasynchronousboto3

Python: Slow generator streaming into fast consumer depletes buffer and terminates early


Say I have a generator that slowly creates my stream of data:

import threading
import time

class SlowStreamSource():
  def __init__(self):
    self.buffer = ""
    self.gen_data = threading.Thread(target=self.generate_stream)
    self.gen_data.start()

  def generate_stream(self):
    i = 0
    while i < 10:
      self.buffer += str(i)
      # Other processing happens
      time.sleep(0.1)
      i += 1
    self.buffer += "-Stream Finished-"

  def read(self, hint = -1):
    if hint is None or hint < 0:
      result = self.buffer
      self.buffer = ""
    else:
      result = self.buffer[:hint]
      result = self.buffer[hint:]
    return result

This data is sent to a consumer that is much faster than the generator and follows the standard practice of invoking read() until there is no more data and exiting

import time
class FastStreamDestination():
  def __init__(self, source):
    self.source = source

  def process_stream(self):
    while True:
      data = self.source.read()
      if not data:
        break
      print(f'read "{data}"')
      # Other processing happens
      time.sleep(0.05)

(I have no control over the consumer. It's Amazon's boto3 upload_fileobj, but I have reviewed their code to determine that this is essentially how it functions.)

When I feed my generator into my consumer, it very quickly depletes the buffer, concludes that the stream is finished and exits prematurely.

src = SlowStreamSource()
dst = FastStreamDestination(src)
dst.process_stream()

yields read "0", but I ultimately need something like

read "0"
read "1"
read "2"
read "3"
read "4"
read "5"
read "6"
read "7"
read "8"
read "9"
read "-Stream Finished-"

Is there anyway to ensure my consumer reads the entire stream from my generator, keeping in mind that I cannot meaningfully speed up the generator, nor can I modify the consumer in any way?


Solution

  • Ok, with some help from a co-worker, I think I have the solution.

    My generator can be aware of if there is more data for it to provide, even if it doesn't have the data ready yet. Since it's a File-like object, that means it has a close function that can be invoked when I'm sure the data is all generated.

    With that awareness, I can make the read function block as long as it needs in order to ensure that it has some data to return.

    import threading
    import time
    
    class SlowStreamSource():
      def __init__(self):
        self.buffer = ""
        self.done = False
        self.gen_data = threading.Thread(target=self.generate_stream)
        self.gen_data.start()
        self.closed = False
    
      def generate_stream(self):
        i = 0
        while i < 10:
          self.buffer += str(i)
          # Other processing happens
          time.sleep(0.1)
          i += 1
        self.buffer += "-Stream Finished-"
        self.closed = True
    
      def read(self, hint = -1):
        while not self.closed and len(self.buffer) == 0:
          time.sleep(0.1)
        if hint is None or hint < 0:
          result = self.buffer
          self.buffer = ""
        else:
          result = self.buffer[:hint]
          result = self.buffer[hint:]
        return result