Search code examples

Parallelize AsyncGenerators

I am looking for a asyncio.create_task equivalent for AsyncGenerator. I want the generator to already start executing in the background, without awaiting results explicitly.

For example:

async def g():
    for i in range(3):
        await asyncio.sleep(1.0)
        yield i

async def main():
    g1 = g()
    g2 = g()
    t = time.time()
    async for i in g1:
        print(i, time.time() - t)
    async for i in g2:
        print(i, time.time() - t)

This takes 6 seconds to execute:

0 1.001204013824463
1 2.0024218559265137
2 3.004373788833618
0 4.00572395324707
1 5.007828950881958
2 6.009296894073486

If both generators were executed in parallel, the total execution would take just ~3 seconds. What is the recommended approach here?


  • This is the approach I came up with:

    async def consume(a_iter: AsyncIterator) -> Tuple[Any, Optional[asyncio.Task]]:
            return await a_iter.__anext__(),  asyncio.create_task(consume(a_iter))
        except StopAsyncIteration:
            return None, None
    def create_generator_task(gen: AsyncGenerator) -> AsyncGenerator:
        result_queue = asyncio.create_task(consume(gen.__aiter__()))
        async def consumer():
            nonlocal result_queue
            while 1:
                item, result_queue = await result_queue
                if result_queue is None:
                    assert item is None
                yield item
        return consumer()

    In case there is not any official solution in standard library that I am missing, this method works pretty well. Example:

    async def main():
        g1 = create_generator_task(g())
        g2 = create_generator_task(g())
        t = time.time()
        async for i in g1:
            print(i, time.time() - t)
        async for i in g2:
            print(i, time.time() - t)


    0 1.0013220310211182
    1 2.002387046813965
    2 3.0078349113464355
    0 3.00785493850708
    1 3.007858991622925
    2 3.007862091064453