Search code examples
pythonpython-3.xgeneratorpython-asyncioyield-from

What is the correct way to yield from a stream?


I have a Connection type that I’m using to wrap read/write stream pairs from asyncio.

class Connection(object):

    def __init__(self, stream_in, stream_out):
        self._streams_ = (stream_in, stream_out)

    def read(self, n_bytes: int = -1):
        stream = self._streams_[0]
        return stream.read(n_bytes)

    def write(self, bytes_: bytes):
        stream = self._streams_[1]
        stream.write(bytes_)
        yield from stream.drain()

When a new client connects to the server, new_connection will create a new Connection object, and expect to receive 4 bytes.

@asyncio.coroutine
def new_connection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)

The client sends 4 bytes.

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')

This works about as I expect, but I do have to write yield from for every call to read and write. I've tried moving the yield from into Connection for this reason.

def read(self, n_bytes: int = -1):
    stream = self._streams_[0]
    data = yield from stream.read(n_bytes)
    return data

But, instead of the expected data bytes, I get a generator object.

<generator object StreamReader.read at 0x1109983b8>

So, for every call to read and write, I must be careful to have the yield from. My goal is to reduce new_connection to the following.

@asyncio.coroutine
def new_connection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    print(conn.read(4))

Solution

  • Because StreamReader.read is a coroutine, your only options for calling it are a) wrapping it in a Task or Future and running that via an event loop, b) awaiting it from coroutine defined with async def, or c) using yield from with it from a coroutine defined as a function decorated with @asyncio.coroutine.

    Since Connection.read is called from an event loop (via the coroutine new_connection), you can't reuse that event loop to run a Task or Future for StreamReader.read: event loops can't be started while they're already running. You'd either have to stop the event loop (disastrous and probably not possible to do correctly) or create a new event loop (messy and defeating the purpose of using coroutines). Neither of those are desirable, so Connection.read needs to be a coroutine or an async function.

    The other two options (await in an async def coroutine or yield from in a @asyncio.coroutine-decorated function) are mostly equivalent. The only difference is that async def and await were added in Python 3.5, so for 3.4, using yield from and @asyncio.coroutine is the only option (coroutines and asyncio didn't exist prior to 3.4, so other versions are irrelevant). Personally, I prefer using async def and await, because defining coroutines with async def is cleaner and clearer than with the decorator.

    In brief: have Connection.read and new_connection be coroutines (using either the decorator or the async keyword), and use await (or yield from) when calling other coroutines (await conn.read(4) in new_connection, and await self.__in.read(n_bytes) in Connection.read).