I'm learning asyncio and trying to figure out how to pass data from one async function to a websocket loop in another async function.
In my scenario, data is POSTed to web API by some third party. I'd like to echo the POST data to connected websocket clients.
Sanic & Rx aren't requirements, but that's the path I've started down. Here's what I've come up with so far:
#!/usr/bin/env python
from sanic import Sanic
from sanic import response
from sanic.response import file
from rx import Observable
app = Sanic(__name__)
@app.route('/')
async def index(request):
return await file('ws.html')
async def observable_message(message):
return Observable.from_(message)
@app.route('/post', methods=["POST"])
async def message_inbound(request):
payload = request.json
await observable_message(payload)
return response.json({"status": "OK"})
@app.websocket('/feed')
async def feed(request, ws):
while True:
message = await app.stream
print('Sending: ' + message)
await ws.send(message)
@app.listener('before_server_start')
async def setup_observable_stream(app, loop):
app.stream = Observable.start_async(observable_message)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9000, workers=4, debug=True)
This clearly won't work because observable_message() requires the message as an arg, and I'm trying to use it to start_async(), so I'm stumped. How can I wire these things up?
The client side can be trivial:
<!DOCTYPE html>
<html>
<head><title>POST data</title></head>
<body>
<script>
var ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/feed'),
messages = document.createElement('ul');
ws.onmessage = function (event) {
var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode('Received: ' + event.data);
message.appendChild(content);
messages.appendChild(message);
};
document.body.appendChild(messages);
</script>
</body>
</html>
There may be a better way to get it working with Rx, but I found that simplifying helped reason through it. This works:
#!/usr/bin/env python
import asyncio
import uvloop
from sanic import Sanic
from sanic import response
from sanic.response import file
from sanic.websocket import ConnectionClosed
app = Sanic(__name__)
app.ws_clients = set()
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
@app.route('/')
async def index(request):
return await file('ws.html')
@app.route('/post', methods=["POST"])
async def message_inbound(request):
payload = request.body.decode("utf-8")
await broadcast(payload)
return response.json({"status": "OK"})
async def broadcast(message):
broadcasts = [ws.send(message) for ws in app.ws_clients]
for result in asyncio.as_completed(broadcasts):
try:
await result
except ConnectionClosed:
print("ConnectionClosed")
except Exception as ex:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(ex).__name__, ex.args)
print(message)
@app.websocket("/ws")
async def websocket(request, ws):
app.ws_clients.add(ws)
await ws.send("Connected.")
print(f'{len(app.ws_clients)} clients')
while True:
data = await ws.recv()
print('Received: ' + data)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=9005, workers=1, debug=False)