This is similar to Can I await the same Task multiple times in Python?, but for trio (instead of asyncio).
Basically, in trio, how can I await
for (the result value of) an async
function multiple times, while actually only executing it once?
E.g., what should the argument be for coro_b
and coro_c
which are executed in parallel?
async def coro_a():
print("executing coro a")
return 'a'
async def coro_b(task_a):
task_a_result = await task_a
print("from coro_b: ", task_a_result)
return 'b'
async def coro_c(task_a):
task_a_result = await task_a
print("from coro_a: ", task_a_result)
return 'c'
def main():
t_a = coro_a()
# At some point
t_b = coro_b(t_a)
...
# At some other point, maybe concurrently to t_b
t_c = coro_c(t_a)
b = await t_b
c = await t_c
main()
(In my case, the trio root loop is managed by some framework pyfuse3
, and I only need to define my own subclass, which contains several async functions (which can be executed in parallel) to be implemented. So I'm not sure how the underlying call was made, but can safely assume they are made correct. Please feel free to supplement the remaining part if anyone feels useful to make this code snippet a "full" version containing a main function.)
(I'm familiar with JS promise, and more familiar with concurrent/parallel concepts and practices, but just do not have enough experience with asyncio and no experience with trio in Python. )
Short answer: you don't.
Trio doesn't have tasks. Given
async def foo(i=21):
await trio.sleep(i/10)
return 2*i
you never do task = foo(); do_whatever(); result = await task
. Not in trio. That's just not an idiom which Trio likes to support, for multiple reasons which are out of scope for this answer. Thus Trio doesn't have a "task" object. It doesn't need one.
Instead, you always do result = await foo()
. There is no task, it's just some code you call (and get the result of). The question of awaiting it multiple times simply doesn't arise.
If you do want to process a result multiple times, you need a wrapper that saves the result somewhere and then sets a trio.Event
. The code that wants the result can then wait on that event and process the result.
Something like this:
class ResultOf:
scope:trio.CancelScope = None
def __init__(self, p, a, k):
self.evt = trio.Event()
self.p = (p,a,k)
async def run_in(self, nursery):
self.scope = await nursery.start(self._wrap, p,a,k)
async def _wrap(self, task_status):
with trio.CancelScope() as sc:
task_status.started(sc)
try:
p,a,k = self.p
self.res = await p(*a,**k)
except Exception as e:
self.err = e
except BaseException as e:
# Basic rules for handling BaseException in Python:
# - you never save them for "recycling" in a different context
# - you always re-raise them
self.err = RuntimeError("got killed")
self.err.__cause__ = e
raise
else:
self.err = None
finally:
self.evt.set()
def cancel(self):
self.scope.cancel()
async def get(self):
await self.evt.wait()
if self.err is not None:
raise self.err
return self.res
async def result_of(n, p,*a,**k):
res = ResultOf(p,a,k)
await res.run_in(n)
...
async with trio.open_nursery() as n:
promise = await result_of(n, foo, 2)
...
assert 4 == await promise.get()
There are reasons this kind of wrapper is not part of the Trio core. Among others: what happens if your promise.get
runs in a scope that gets cancelled, so the result isn't needed any more – should that cancellation propagate to foo
or not?
Thus the real answer is probably to re-factor your code so that you no longer need this kind of thing in the first place.
More generally, when converting something to Trio, it's easier to start with sync code and just sprinkle async
and await
onto it than to start with an asyncio/promise/etc. base and try to remove or convert all those pesky tasks.
The structure of a language shapes your thinking. Promises and tasks are not part of the Structured Concurrency concept that Trio has been written to adhere to. Read https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ (written by Trio's main author) for more background.