Given a multiprocessing.Queue
that is filled from different Python threads, created via ThreadPoolExecutor.submit(...)
.
How to access that Queue
with asyncio / Trio / Anyio in a safe manner (context FastAPI) and reliable manner?
I am aware of Janus library, but prefer a custom solution here.
Asked (hopefully) more concisely:
How to implement the
await <something_is_in_my_multiprocessing_queue>
to have it accesible with async/await and to prevent blocking the event loop?
What synchronization mechanism in general would you suggest?
(Attention here: multiprocessing.Queue
not asyncio.Queue
)
Actually, I figured it out.
Given a method, that reads the mp.Queue
:
def read_queue_blocking():
return queue.get()
Comment: And this is the main issue: A call to get
is blocking.
We can now either
await anyio.to_thread.run_sync(...)
to execute the blocking retrieval of data from the queue in a separate thread.For FastAPI
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
while True:
import anyio
queue_result = await anyio.to_thread.run_sync(read_queue_blocking)
await websocket.send_text(f"Message text was: {queue_result}")