I am writing a TCP-based client which do both req/rep pattern communication and push/pull pattern communication. I tried to employ asyncio
and its transport/protocol structure to handle the low-level transport actions. But the APIs for others to use is not run in asyncio context. So I create the following archetecture to run asyncio in a thread and use asyncio.run_coroutine_threadsafe
to submit request and wait the response using a future.
import asyncio as aio
from threading import Thread
class MyProtocol(aio.Protocol):
...
async def aio_main(drv, ip, port):
loop = aio.get_running_loop()
on_connection_lost = loop.create_future()
transport, protocol = loop.create_connection(
lambda: MyProtocol(drv, on_connection_lost ), ip, port
)
drv._transport = transport
drv._protocol = protocol
drv._loop = loop
try:
await on_connect_lost
finally:
transport.close()
class Driver:
def __init__(self):
self._thread = Thread(target=lambda: aio.run(aio_main(self, '192.168.0.1', 8888))
self._thread.start()
async def _request(self, data):
loop = aio.get_running_loop()
fut = loop.create_future()
self._protocol._rsp = fut
self._transport.write(data)
return await fut # which is set in MyProtocol
def request(self, data):
coro = self._request(b'HELLO')
fut = aio.run_coroutine_threadsafe(coro, self._loop)
return fut.result(1.0) # 1s timeout
Does this archetecture a good practice or bad? Is there better pattern do handle this?
This should work - and is more or less the "one obvious way to do it" as Python folks like to recite.
The thing that calls attention is that this serializes calls due to the use of the ._rsp
attribute in the protocol, so, if your driver.request
is called concurrently (in a multi-threading app), things will mess up.
If you rewrite your protocol._rsp
as a @property
which makes use of a contextvars.ContextVar
instance, so that concurrent tasks running ._request
will each see a different Future
instance, this would be overcome, and your driver would be good for concurrent calls. (Ok, we don´t have the code for "MyProtocol" - any state for a single request it holds would also need to be either stored as contextvars, or passed along as function-call arguments).
Other than that, it is ok!