Search code examples
pythonflaskpython-requestsserver-sent-events

Using SSE to pass on data from other APIs in real time


I want to make a real time visualization in which I have a flask application sending SSE to the HTML which is then doing its stuff. Visualization works totally fine. I am confused with respect to the SSE. There is a system which pings my application at '/data' to handover the data. Which I then want to transfer to the vis in the event stream.

import time, json
from flask import Flask, request, Response, render_template

app =Flask(__name__)

def data_stream(data):
    if data:
        yield 'data: {}\n\n'.format(json.dumps(data))
    else:
        yield 'data: {}\n\n'.format(json.dumps({'data': [{'lat':0, 'lg':0}]}))

@app.route('/data', methods =['GET','POST'])
def collect_data():
    data = {"data": [request.get_json()]}
    data_stream(data)
    return 'asd'


@app.route('/my_event_source', methods =['GET', 'POST'])
def sse_request():
    return Response(data_stream(None), mimetype='text/event-stream')

@app.route('/')
def page():
    return render_template('map_vis.html')

if __name__ == '__main__':
    data2 = []
    app.debug =True
    app.run('0.0.0.0', 8081)

I am unable to figure out a way to pass the data_stream function to Response because as of now it is simply calling data_stream(None) only i.e. I am getting {'lat':0, 'lg':0} in response.


Solution

  • It seems like you're using SSE like long polling or another AJAX-derived technique ; SSE is more useful as a stream, so once the SSE connection is established you need to keep the data sending function 'alive' and to make it sending data whenever you need it.

    So, I would do that:

    import Queue
    data_queue = Queue.Queue()
    
    def data_stream():
        while True:
            data = data_queue.get()
            yield 'data: {}\n\n'.format(json.dumps(data))
    

    When sending data through SSE, I would just put data in the queue:

    def sse_request():
        r = Response(data_stream(),  mimetype='text/event-stream')
        data_queue.put({'data': [{'lat':0, 'lg':0}]})
        return r
    
    def collect_data():
        data_queue.put({"data": [request.get_json()]})
        ...
    

    The only thing is that writing it this way will cause the while loop to block the server. So you need to use something additional, like gevent or Eventlet. There are lots of examples how to combine Flask with those.