Search code examples
pythonwebsocketrx-pysanic

How to websocket.send() data from an async function


I'm learning asyncio and trying to figure out how to pass data from one async function to a websocket loop in another async function.

In my scenario, data is POSTed to web API by some third party. I'd like to echo the POST data to connected websocket clients.

Sanic & Rx aren't requirements, but that's the path I've started down. Here's what I've come up with so far:

#!/usr/bin/env python

from sanic import Sanic
from sanic import response
from sanic.response import file
from rx import Observable

app = Sanic(__name__)

@app.route('/')
async def index(request):
    return await file('ws.html')

async def observable_message(message):
    return Observable.from_(message)

@app.route('/post', methods=["POST"])
async def message_inbound(request):
    payload = request.json
    await observable_message(payload)
    return response.json({"status": "OK"})

@app.websocket('/feed')
async def feed(request, ws):
    while True:
        message = await app.stream
        print('Sending: ' + message)
        await ws.send(message)

@app.listener('before_server_start')
async def setup_observable_stream(app, loop):
    app.stream = Observable.start_async(observable_message)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=9000, workers=4, debug=True)

This clearly won't work because observable_message() requires the message as an arg, and I'm trying to use it to start_async(), so I'm stumped. How can I wire these things up?

The client side can be trivial:

<!DOCTYPE html>
<html>
<head><title>POST data</title></head>
<body>
<script>
    var ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/feed'),
        messages = document.createElement('ul');
    ws.onmessage = function (event) {
        var messages = document.getElementsByTagName('ul')[0],
            message = document.createElement('li'),
            content = document.createTextNode('Received: ' + event.data);
        message.appendChild(content);
        messages.appendChild(message);
    };
    document.body.appendChild(messages);
</script>
</body>
</html>

Solution

  • There may be a better way to get it working with Rx, but I found that simplifying helped reason through it. This works:

    #!/usr/bin/env python
    
    import asyncio
    import uvloop
    
    from sanic import Sanic
    from sanic import response
    from sanic.response import file
    from sanic.websocket import ConnectionClosed
    
    app = Sanic(__name__)
    
    app.ws_clients = set()
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    @app.route('/')
    async def index(request):
        return await file('ws.html')
    
    @app.route('/post', methods=["POST"])
    async def message_inbound(request):
        payload = request.body.decode("utf-8")
        await broadcast(payload)
        return response.json({"status": "OK"})
    
    async def broadcast(message):
        broadcasts = [ws.send(message) for ws in app.ws_clients]
        for result in asyncio.as_completed(broadcasts):
            try:
                await result
            except ConnectionClosed:
                print("ConnectionClosed")
            except Exception as ex:
                template = "An exception of type {0} occurred. Arguments:\n{1!r}"
                message = template.format(type(ex).__name__, ex.args)
                print(message)
    
    @app.websocket("/ws")
    async def websocket(request, ws):
        app.ws_clients.add(ws)
        await ws.send("Connected.")
        print(f'{len(app.ws_clients)} clients')
        while True:
            data = await ws.recv()
            print('Received: ' + data)
    
    if __name__ == '__main__':
        app.run(host='0.0.0.0', port=9005, workers=1, debug=False)