Search code examples
pythondesign-patternspython-asyncio

Archtecture/Design pattern for communication between asyncio and non-asyncio context


I am writing a TCP-based client which do both req/rep pattern communication and push/pull pattern communication. I tried to employ asyncio and its transport/protocol structure to handle the low-level transport actions. But the APIs for others to use is not run in asyncio context. So I create the following archetecture to run asyncio in a thread and use asyncio.run_coroutine_threadsafe to submit request and wait the response using a future.

import asyncio as aio
from threading import Thread

class MyProtocol(aio.Protocol):
    ...

async def aio_main(drv, ip, port):
    loop = aio.get_running_loop()
    on_connection_lost = loop.create_future()
    transport, protocol = loop.create_connection(
        lambda: MyProtocol(drv, on_connection_lost ), ip, port
    )
    drv._transport = transport
    drv._protocol = protocol
    drv._loop = loop
    try:
        await on_connect_lost
    finally:
        transport.close()

class Driver:
    def __init__(self):
        self._thread = Thread(target=lambda: aio.run(aio_main(self, '192.168.0.1', 8888))
        self._thread.start()

    async def _request(self, data):
        loop = aio.get_running_loop()
        fut = loop.create_future()
        self._protocol._rsp = fut
        self._transport.write(data)
        return await fut # which is set in MyProtocol

    def request(self, data):
        coro = self._request(b'HELLO')
        fut = aio.run_coroutine_threadsafe(coro, self._loop)
        return fut.result(1.0) # 1s timeout

Does this archetecture a good practice or bad? Is there better pattern do handle this?


Solution

  • This should work - and is more or less the "one obvious way to do it" as Python folks like to recite.

    The thing that calls attention is that this serializes calls due to the use of the ._rsp attribute in the protocol, so, if your driver.request is called concurrently (in a multi-threading app), things will mess up.

    If you rewrite your protocol._rsp as a @property which makes use of a contextvars.ContextVar instance, so that concurrent tasks running ._request will each see a different Future instance, this would be overcome, and your driver would be good for concurrent calls. (Ok, we don´t have the code for "MyProtocol" - any state for a single request it holds would also need to be either stored as contextvars, or passed along as function-call arguments).

    Other than that, it is ok!