Search code examples
pythonasync-awaitpython-trio

How to await for a job multiple times in trio?


This is similar to Can I await the same Task multiple times in Python?, but for trio (instead of asyncio). Basically, in trio, how can I await for (the result value of) an async function multiple times, while actually only executing it once?

E.g., what should the argument be for coro_b and coro_c which are executed in parallel?

async def coro_a():
    print("executing coro a")
    return 'a'


async def coro_b(task_a):
    task_a_result = await task_a
    print("from coro_b: ", task_a_result)
    return 'b'


async def coro_c(task_a):
    task_a_result = await task_a
    print("from coro_a: ", task_a_result)
    return 'c'

def main():
    t_a = coro_a()

    # At some point
    t_b = coro_b(t_a)
    ...
    # At some other point, maybe concurrently to t_b
    t_c = coro_c(t_a)

    b = await t_b
    c = await t_c

main()

(In my case, the trio root loop is managed by some framework pyfuse3, and I only need to define my own subclass, which contains several async functions (which can be executed in parallel) to be implemented. So I'm not sure how the underlying call was made, but can safely assume they are made correct. Please feel free to supplement the remaining part if anyone feels useful to make this code snippet a "full" version containing a main function.)

(I'm familiar with JS promise, and more familiar with concurrent/parallel concepts and practices, but just do not have enough experience with asyncio and no experience with trio in Python. )


Solution

  • Short answer: you don't.

    Trio doesn't have tasks. Given

    async def foo(i=21):
        await trio.sleep(i/10)
        return 2*i
    

    you never do task = foo(); do_whatever(); result = await task. Not in trio. That's just not an idiom which Trio likes to support, for multiple reasons which are out of scope for this answer. Thus Trio doesn't have a "task" object. It doesn't need one.

    Instead, you always do result = await foo(). There is no task, it's just some code you call (and get the result of). The question of awaiting it multiple times simply doesn't arise.

    If you do want to process a result multiple times, you need a wrapper that saves the result somewhere and then sets a trio.Event. The code that wants the result can then wait on that event and process the result.

    Something like this:

    class ResultOf:
        scope:trio.CancelScope = None
        def __init__(self, p, a, k):
            self.evt = trio.Event()
            self.p = (p,a,k)
    
        async def run_in(self, nursery):
            self.scope = await nursery.start(self._wrap, p,a,k)
    
        async def _wrap(self, task_status):
            with trio.CancelScope() as sc:
                task_status.started(sc)
                try:
                    p,a,k = self.p
                    self.res = await p(*a,**k)
                except Exception as e:
                    self.err = e
                except BaseException as e:
                    # Basic rules for handling BaseException in Python:
                    # - you never save them for "recycling" in a different context
                    # - you always re-raise them
                    self.err = RuntimeError("got killed")
                    self.err.__cause__ = e
                    raise
                else:
                    self.err = None
                finally:
                    self.evt.set()
    
        def cancel(self):
            self.scope.cancel()
    
        async def get(self):
            await self.evt.wait()
            if self.err is not None:
                raise self.err
            return self.res
    
    async def result_of(n, p,*a,**k):
        res = ResultOf(p,a,k)
        await res.run_in(n)
    ...
    async with trio.open_nursery() as n:
        promise = await result_of(n, foo, 2)
        ...
        assert 4 == await promise.get()
    

    There are reasons this kind of wrapper is not part of the Trio core. Among others: what happens if your promise.get runs in a scope that gets cancelled, so the result isn't needed any more – should that cancellation propagate to foo or not?

    Thus the real answer is probably to re-factor your code so that you no longer need this kind of thing in the first place.

    More generally, when converting something to Trio, it's easier to start with sync code and just sprinkle async and await onto it than to start with an asyncio/promise/etc. base and try to remove or convert all those pesky tasks.

    The structure of a language shapes your thinking. Promises and tasks are not part of the Structured Concurrency concept that Trio has been written to adhere to. Read https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ (written by Trio's main author) for more background.