Search code examples
python-asynciofastapipython-multiprocessingpython-anyio

Make multiprocessing.Queue accessible from asyncio


Given a multiprocessing.Queue that is filled from different Python threads, created via ThreadPoolExecutor.submit(...).

How to access that Queue with asyncio / Trio / Anyio in a safe manner (context FastAPI) and reliable manner?

I am aware of Janus library, but prefer a custom solution here.

Asked (hopefully) more concisely:

How to implement the

await <something_is_in_my_multiprocessing_queue>

to have it accesible with async/await and to prevent blocking the event loop?

What synchronization mechanism in general would you suggest?

(Attention here: multiprocessing.Queue not asyncio.Queue)


Solution

  • Actually, I figured it out.

    Given a method, that reads the mp.Queue:

    def read_queue_blocking():
        return queue.get()
    

    Comment: And this is the main issue: A call to get is blocking.

    We can now either

    For FastAPI

    @app.websocket("/ws/{client_id}")
    async def websocket_endpoint(websocket: WebSocket, client_id: str):
        await websocket.accept()
        while True:
            import anyio
            queue_result = await anyio.to_thread.run_sync(read_queue_blocking)
            await websocket.send_text(f"Message text was: {queue_result}")