Search code examples
pythonwebsocketasync-awaitreceiver

Python - Async callback/receiver for WebSocket


I am trying to implement WebSocket connection to a server (Python app <=> Django app)

Whole system runs in big Asyncio loop with many tasks. Code snippet is just very small testing part.

I am able to send any data to a server at any moment and many of them will be type request something and wait for response. But I would like to have some "always running" handler for all incoming messages. (When something in Django database will change I want to send changes to python app).

How can Include always running receiver/ or add callback to websocket? I am not able to find any solution for this.

My code snippet:

import asyncio, json, websockets, logging

class UpdateConnection:

    async def connect(self,botName):
        self.sock = await websockets.connect('ws://localhost:8000/updates/bot/'+botName)
        
    async def send(self,data):
        try:
            await self.sock.send(json.dumps(data))
        except:
            logging.info("Websocket connection lost!")
            # Find a way how to reconenct... or make socket reconnect automatically

            
if __name__ == '__main__':
    async def DebugLoop(socketCon):
        await socketCon.connect("dev")
        print("Running..")
        while True:
            data = {"type": "debug"}
            await socketCon.send(data)
            await asyncio.sleep(1)

    uSocket = UpdateConnection()
    loop = asyncio.get_event_loop()
    loop.create_task(DebugLoop(uSocket))
    loop.run_forever()

My debug server after connection will start sending random messages to the client in random intervals and I would like to somehow handle them in async way.

Thanks for any help :)


Solution

  • It needn't be that complicated. First of all, I suggest you use the context patterns offered by websockets module.

    From the documentation:

    connect() can be used as an infinite asynchronous iterator to reconnect automatically on errors:

    async for websocket in websockets.connect(...):
        try:
            ...
        except websockets.ConnectionClosed:
            continue
    

    Additionally, you simply keep the connection alive by awaiting incoming messages:

    my_websocket = None
    
    async for websocket in websockets.connect('ws://localhost:8000/updates/bot/' + botName):
        try:
            my_websocket = websocket
            async for message in websocket:
                pass # here you could also process incoming messages
        except websockets.ConnectionClosed:
            my_websocket = None
    

    As you can see we have a nested loop here:

    1. The outer loop constantly reconnects to the server
    2. The inner loop processes one incoming message at a time

    If you are connected, and no messages are coming in from the server, this will just sleep.

    The other thing that happens here is that my_websocket is set to the active connection, and unset again when the connection is lost. In other parts of your script you can use my_websocket to send data. Note that you will need to check if it is currently set wherever you use it:

    async def send(data):
        if my_websocket:
            await my_websocket.send(json.dumps(data))
    

    This is just an illustration, you can also keep the websocket object as an object member, or pass it to another component through a setter function, etc.