Search code examples
pythonwebsocketredisfastapistarlette

How to run blocking operations loops concurrently using python async?


I have two unrelated blocking operations that listen to different events. When any of them return, I need to do an appropriate handling of the underlying event they raised.

For some reason, no matter how I schedule them using AsyncIO, I never get to run them concurrently. Apparently, receive_json() seem to block indefinitely whenever the other loop is running; which is why, I suspect a concurrency problem on the websocket or the async loop without being able to really pinpoint what is or how to solve the issue.

My current code is illustrated below in this simplified example, but I've also tried other asyncio interfaces like running them in a single loop, using timeouts, or using asycio.wait() without any more success.

The techs used are uvicorn as a ASGI server, FastApi for web interface, Redis pubsub (redis-py connector) as one awaitable and the starlette Websocket as the other. They run in a docker container, hosted on a windows machine if that's of any interest.


async def await_redis(p):
    return str(p.get_message(timeout=None))

@router.websocket('/'):
def ws_endpoint(websocket Websocket):
    async def ws_loop():
        while True:
            data = await websocket.receive_json() # Blocks here whenever rd_loop runs
            messages = await handler(data)
            r.publish('some-channel', messages)

    async def rd_loop():
        r = Redis('host')
        p = r.pubsub('some-channel')
        while True:
            mess = await await_redis(p)
            if mess:
                await websocket.send_json([mess])
    # The strange thing is if rd_loop exits because of exception,
    # ws_loop starts to receive and handle messages.
    await asyncio.gather(ws_loop(), rd_loop()) 

Solution

  • The await_redis function is blocking the event loop, get_message method from the redis-py library is not async, so it block the event loop. Lets try with aioredis library instead of redis-py.

    First we install it pip install aioredis then here is your modified code:

    import aioredis
    
    async def await_redis(p):
        return str(await p.get_message())
    
    @router.websocket('/')
    async def ws_endpoint(websocket: WebSocket):
        async def ws_loop():
            while True:
                data = await websocket.receive_json()
                messages = await handler(data)
                await r.publish('some-channel', messages)
    
        async def rd_loop():
            r = await aioredis.create_redis('redis://host')
            p = await r.pubsub()
            await p.subscribe('some-channel')
            while True:
                mess = await await_redis(p)
                if mess:
                    await websocket.send_json([mess])
    
        # The strange thing is if rd_loop exits because of exception,
        # ws_loop starts to receive and handle messages.
        await asyncio.gather(ws_loop(), rd_loop())