Search code examples
websocketpython-asyncio

How to break out of an (asyncio) websocket fetch loop that doesn't have any incoming messages?


This code prints all messages from a websocket connection:

class OrderStreamer:
    def __init__(ᬑ):
        ᬑ.terminate_flag = False

        # worker thread to receive data stream
        ᬑ.worker_thread = threading.Thread(
            target=ᬑ.worker_thread_func,
            daemon=True
            )

    def start_streaming(ᬑ, from_scheduler = False):
        ᬑ.worker_thread.start()


    def terminate(ᬑ):
        ᬑ.terminate_flag = True


    def worker_thread_func(ᬑ):
        asyncio.run(ᬑ.aio_func())  # blocks


    async def aio_func(ᬑ):
        async with \
                aiohttp.ClientSession() as session, \
                session.ws_connect(streams_url) as wsock, \
                anyio.create_task_group() as tg:

            async for msg in wsock:
                print(msg.data)

                if ᬑ.terminate_flag:
                    await wsock.close()

The problem is that if no messages arrive, the loop never gets the chance to check terminate_flag and never exits.

I tried creating an external reference to the runloop and websocket:

        async with \
                aiohttp.ClientSession() as session, \
                session.ws_connect(streams_url) as wsock, \
                anyio.create_task_group() as tg:

            ᬑ.wsock = wsock
            ᬑ.loop = asyncio.get_event_loop()

... and modifying my terminate function:

    def terminate(ᬑ):
        # ᬑ.loop.stop()
        asyncio.set_event_loop(ᬑ.loop)

        async def kill():
            await ᬑ.wsock.close()

        asyncio.run(kill())

... but it does not work.

I can't afford to rearchitect my entire application to use asyncio at this point in time.

How to break out of the loop?


Solution

  • You should use asyncio.wait_for or asyncio.wait and call wsock.__anext__() directly instead of using async for loop.

    The loop with asyncio.wait should look something like this:

    next_message = asyncio.create_task(wsock.__anext__())
    
    while not self.terminate_flag:
        await asyncio.wait([next_message], timeout=SOME_TIMEOUT,)
        if next_message.done():
            try:
                msg = next_message.result()
            except StopAsyncIteration:
                break
            else:
                print(msg.data)
                next_message = asyncio.create_task(wsock.__anext__())
    

    SOME_TIMEOUT should be replaced with the amount of seconds you want to wait continuously for the next incoming message

    Here is the documentation for asyncio.wait

    P.S. I replaced with self, but I hope you get the idea