Search code examples
pythonflaskwerkzeug

Flask: Streaming data by writing to client?


I have existing code that serializes data to a file-like object:

def some_serialization_function(file):
    file.write(...)

In Flask, I would like to be able to send the serialized data directly to the client, without buffering it in memory first.

I looked at ResponseStreamMixin from werkzeug, but I don't think it can work without buffering:

class StreamResponse(flask.Response, werkzeug.wrappers.ResponseStreamMixin):
   pass

@app.route("/data")
def get_data():
   r = StreamResponse()
   some_serialization_function(r.stream) # everything is buffered into memory
   return r # buffered data is sent after return

All examples for streaming data that I found are based on generators, which work in the opposite direction (ie data is "pulled" from the generator, not "pushed out" via a write call), so I wonder, is there a way to "write" directly to the client in Flask?

EDIT - to be more clear: I'm looking for a way to serve the data generated by "some_serialization_function(...)" (which I cannot easily change) without the memory/IO overhead of having that function write all the data to a buffer/file first.

(I suspect that a tempfile will be the way to go in the end, since the IO overhead will not be significant in comparison to the overhead of actually sending the data over the network. Also my main concern is memory overhead).


Solution

  • You can create a special file-like object that feeds the generator that streams out to the client. Here is a quick & dirty implementation using a queue:

    from queue import Queue
    
    class StreamWriter(object):
        def __init__(self):
            self.queue = Queue()
    
        def write(self, str):
            self.queue.put(str)
    
        def read(self):
            str = self.queue.get()
            self.queue.task_done()
            if str == '~':
                return None
            return str
    
        def close(self):
            self.write('~')  # indicate EOF
    

    This is nothing more than a pub-sub type queue. The read() method will block until there is something written in another thread.

    Now you can stream a response using a generator. The following example shows a generator that takes the serialization function as an argument. The serialization function is executed in a background thread and receives the file-like object as an argument.

    def generate_response(serialize):
        file = StreamWriter()
        def serialize_task():
            serialize(file)
            file.close()
        threading.Thread(target=serialize_task).start()
        while True:
            chunk = file.read()
            if chunk is None:
                break
            yield chunk
    

    I hope this helps!