Search code examples
pythonasync-awaitgeneratorpython-trio

Is yielding from inside a nursery in an asynchronous generator function bad?


I was told that the following code is not safe, because it is not allowed to have an asynchronous generator that yields from inside a nursery, except if it is an asynchronous context manager.

T = TypeVar('T')

async def delay(interval: float, source: AsyncIterable[T]) -> AsyncIterable[T]:
    """Delays each item in source by an interval.

    Received items are temporarily stored in an unbounded queue, along with a timestamp, using
    a background task. The foreground task takes items from the queue, and waits until the
    item is older than the given interval and then yields it."""

    send_channel, receive_channel = trio.open_memory_channel(math.inf)

    async def pull_task():
        async with aclosing(source) as agen:
            async for item in agen:
                send_channel.send_nowait((item, trio.current_time() + interval))

    async with trio.open_nursery() as nursery:
        nursery.start_soon(pull_task)
        async with receive_channel:
            async for item, timestamp in receive_channel:
                now = trio.current_time()
                if timestamp > now:
                    await trio.sleep(timestamp - now)
                yield item

I have trouble understanding how this can possibly break. If anyone can provide an example code that uses this exact generator function, which demonstrates the unsafeness, it would be greatly appreciated and rewarded.

The goal of above code, is to delay processing of an asynchronous sequence, without applying any backpressure. If you can demonstrate that this code does not work like I would expect, that would also be appreciated.

Thank you.


Solution

  • Unfortunately, that's correct – yield inside a nursery or cancel scope isn't supported, except in the narrow cases of using @contextlib.asynccontextmanager to create an async context manager or writing an async pytest fixture.

    There are several reasons for this. Some of them are technical: Trio has to keep track of which nurseries/cancel scopes are currently "active" on the stack, and when you yield out of one then it breaks the nesting, and Trio has no way to know that you've done this. (There's no way for a library to detect a yield out of a context manager.)

    But there's also a fundamental, unsolveable reason, which is that the whole idea of Trio and structured concurrency is that every task "belongs" to a parent task that can receive notification if the child task crashes. But when you yield in a generator, the generator frame gets frozen and detached from the current task – it might resume in another task, or never resume at all. So when you yield, that breaks that link between all the child tasks in the nursery and their parents. There's just no way to reconcile that with the principles of structured concurrency.

    Over in the Trio chat, Joshua Oreman gave a specific example that breaks in your case:

    if I run the following

    async def arange(*args):
        for val in range(*args):
            yield val
    
    async def break_it():
        async with aclosing(delay(0, arange(3))) as aiter:
            with trio.move_on_after(1):
                async for value in aiter:
                    await trio.sleep(0.4)
                    print(value)
    
    trio.run(break_it)
    

    then I get

    RuntimeError: Cancel scope stack corrupted: attempted to exit
    <trio.CancelScope at 0x7f364621c280, active, cancelled> in <Task
    '__main__.break_it' at 0x7f36462152b0> that's still within its child
    <trio.CancelScope at 0x7f364621c400, active>
    
    This is probably a bug in your code, that has caused Trio's internal
    state to become corrupted. We'll do our best to recover, but from now
    on there are no guarantees.
    
    Typically this is caused by one of the following:
      - yielding within a generator or async generator that's opened a cancel
        scope or nursery (unless the generator is a @contextmanager or
        @asynccontextmanager); see https://github.com/python-trio/trio/issues/638 [...]
    

    By changing the timeouts and delay so that the timeout expired while inside the generator rather than while outside of it, I was able to get a different error also: trio.MultiError: Cancelled(), GeneratorExit() raised out of aclosing()

    There's also a long discussion about all these issues here, which is where we figured out that this just can't be supported: https://github.com/python-trio/trio/issues/264

    It's an unfortunate situation, both because it's a shame that we can't support it, and even worse that it looks like it works in simple cases, so folks can end up writing a lot of code that uses this trick before realizing that it doesn't work :-(

    Our plan is to make the illegal cases give an obvious error immediately when you try to yield, to at least avoid the second problem. But, this will take a while because it requires adding some extra hooks to the Python interpreter.

    It is also possible to create a construct that's almost as easy to write and use as async generators, but that avoids this problem. The idea is that instead of pushing and popping the generator from the stack of the task that's consuming it, you instead run the "generator" code as a second task that feeds the consumer task values. See the thread starting here for more details.