Search code examples
pythonwebsocketclientpython-asyncio

Python Websocket Client Sending Messages Periodically but with different time Intervalls


I want to send two different messages to a websocket server but with different time intervalls. For example:

  1. The first message should be send every 2 seconds.
  2. The second message should send every 5 seconds.
async def send_first_message(websocket):
    while True: 
        await websocket.send("FIRST MESSAGE")
        response = await websocket.recv()
        await asyncio.sleep(2)



async def send_second_message():
    while True: 
        async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:

            asyncio.create_task(send_first_message(websocket))

            while True:
                await websocket.send("SECOND MESSAGE")
                response = await websocket.recv()
                await asyncio.sleep(5)


asyncio.run(send_second_message())

If I run the code like this I get:

"RuntimeError: cannot call recv while another coroutine is already waiting for the next message"

If I comment out one of the "await websocket.recv()" it works fine for a few seconds and then it throws:

"RuntimeError no close frame received or sent"


Solution

  • There's a bit of a disconnect between what you are trying to do in the tasks (synchronous request-response interaction) and what the protocol and the library expects you to do (asynchronous messages).

    When writing asynchronous code, you need to look at what the library/protocol/service expects to be an atomic operation that can happen asynchronously to everything else, and what you want to be a synchronous series of operations. Then you need to find the primitive in the library that will support that. In the case of websockets, the atomic operation is a message being sent in either direction. So you can't expect websockets to synchronize flow over two messages.

    Or to put it another way, you are expecting synchronous responses for each send message, but websockets are not designed to handle interleaved synchronous requests. You've sent a message to the websocket server, and you want to get a response to that message. But you've also sent another message on the same websocket and want a response to that too. Your client websocket library can't differentiate between a message intended for the first request and a message intended for the second request (because from the websocket protocol layer, that is a meaningless concept - so the library enforces this by limiting the recv operations on a websocket that can be blocking to one).

    So ...

    Option 1 - multiple tasks on separate sockets

    From the fact the library limits a websocket to one blocking recv, a primitive in the protocol that meets the requirement is the websocket itself. If these are separate requests that you need separate blocking responses to (so only continue in the requesting task once those responses are available) then you could have separate websocket connections and block for the response in each.

    client1.py

    async def send_first_message():
        async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
            while True: 
                await websocket.send("FIRST MESSAGE")
                response = await websocket.recv()
                print(response)
                await asyncio.sleep(2)
    
    async def send_second_message():
        async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
            while True: 
                await websocket.send("SECOND MESSAGE")
                response = await websocket.recv()
                print(response)
                await asyncio.sleep(5)
    
    async def main():
        asyncio.create_task(send_first_message())
        asyncio.create_task(send_second_message())
        await asyncio.Future()
    
    asyncio.run(main())
    

    Option 1 is however not really the websocket or asynchronous way.

    Option 2 - embrace the asynchronous

    To do this on a single websocket, you will need to receive the response asynchronous to both sending tasks.

    If you don't actually care that the send_* functions get the response, you can do this easily...

    client2.py

    async def send_first_message(websocket):
        while True: 
            await websocket.send("FIRST MESSAGE")
            await asyncio.sleep(2)
    
    async def send_second_message(websocket):
        while True:
            await websocket.send("SECOND MESSAGE")
            await asyncio.sleep(5)
            
    async def receive_message(websocket):
        while True:
            response = await websocket.recv()
            print(response)
    
    async def main():
        async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
            asyncio.create_task(send_first_message(websocket))
            asyncio.create_task(send_second_message(websocket))
            asyncio.create_task(receive_message(websocket))
            await asyncio.Future()
    
    asyncio.run(main())
    

    Option 3

    But what if you want to line up responses to requests and keep on a single websocket? You need some way of knowing which request any particular response is for. Most web services that need this sort of interaction will have you send an ID in the message to the server, and it will respond once a response is ready using the ID as a reference.

    There's also a way of getting your message tasks to block and wait for the response with the right ID by queuing up the responses and checking them periodically.

    client3.py

    unhandled_responses = {}
    
    async def send_first_message(websocket):
        while True:
            req_id = random.randint(0,65535)
            message = json.dumps({'id': req_id, 'message': 'FIRST MESSAGE'})
            await websocket.send(message)
            response = await block_for_response(req_id)
            print(response)
            await asyncio.sleep(2)
    
    async def send_second_message(websocket):
        while True:
            req_id = random.randint(0,65535)
            message = json.dumps({'id': req_id, 'message': 'SECOND MESSAGE'})
            await websocket.send(message)
            response = await block_for_response(req_id)
            print(response)
            await asyncio.sleep(5)
    
    async def block_for_response(id):
        while True:
            response = unhandled_responses.pop(id, None)
            if response:
                return response
            await asyncio.sleep(0.1)
    
    async def receive_message(websocket):
        while True:
            response = json.loads(await websocket.recv())
            unhandled_responses[response['id']] = response
    
    async def main():
        async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
            asyncio.create_task(send_first_message(websocket))
            asyncio.create_task(send_second_message(websocket))
            asyncio.create_task(receive_message(websocket))
            await asyncio.Future()
    
    asyncio.run(main())
    

    For completeness, the server code the clients were talking to in my tests.

    server.py

    import asyncio
    import websockets
    
    async def server_endpoint(websocket):
        try:
            while True:
                recv_msg = await websocket.recv()
                response = recv_msg
                await websocket.send(response)
        except Exception as ex:
            print(str(ex))
    
    async def main():
        async with websockets.serve(server_endpoint, "localhost", 8765):
            await asyncio.Future()  # run forever
    
    if __name__ == "__main__":
        asyncio.run(main())