I am looking for a asyncio.create_task
equivalent for AsyncGenerator
. I want the generator to already start executing in the background, without awaiting results explicitly.
For example:
async def g():
for i in range(3):
await asyncio.sleep(1.0)
yield i
async def main():
g1 = g()
g2 = g()
t = time.time()
async for i in g1:
print(i, time.time() - t)
async for i in g2:
print(i, time.time() - t)
This takes 6 seconds to execute:
0 1.001204013824463
1 2.0024218559265137
2 3.004373788833618
0 4.00572395324707
1 5.007828950881958
2 6.009296894073486
If both generators were executed in parallel, the total execution would take just ~3 seconds. What is the recommended approach here?
This is the approach I came up with:
async def consume(a_iter: AsyncIterator) -> Tuple[Any, Optional[asyncio.Task]]:
try:
return await a_iter.__anext__(), asyncio.create_task(consume(a_iter))
except StopAsyncIteration:
return None, None
def create_generator_task(gen: AsyncGenerator) -> AsyncGenerator:
result_queue = asyncio.create_task(consume(gen.__aiter__()))
async def consumer():
nonlocal result_queue
while 1:
item, result_queue = await result_queue
if result_queue is None:
assert item is None
return
yield item
return consumer()
In case there is not any official solution in standard library that I am missing, this method works pretty well. Example:
async def main():
g1 = create_generator_task(g())
g2 = create_generator_task(g())
t = time.time()
async for i in g1:
print(i, time.time() - t)
async for i in g2:
print(i, time.time() - t)
Prints:
0 1.0013220310211182
1 2.002387046813965
2 3.0078349113464355
0 3.00785493850708
1 3.007858991622925
2 3.007862091064453