Search code examples
pythonasynchronousasync-awaitpython-asynciopython-collections

How to convert AsyncIterable to asyncio Task


I am using Python 3.11.5 with the below code:

import asyncio
from collections.abc import AsyncIterable


# Leave this iterable be, the question is about
# how to use many instances of this in parallel
async def iterable() -> AsyncIterable[int]:
    yield 1
    yield 2
    yield 3


# How can one get multiple async iterables to work with asyncio.gather?
# In other words, since asyncio.gather works with asyncio.Task,
# (1) How can one convert an async iterable to a Task?
# (2) How can one use asyncio.gather to run many of these tasks in parallel,
#     keeping the results 1-1 with the source iterables?
results_1, results_2, results_3 = asyncio.gather(iterable(), iterable(), iterable())

To restate the question, how can one get:

  • An AsyncIterable as an asyncio task, where the task iterates until exhaustion
  • Run multiple of these tasks in parallel, storing the results on a per-task basis

(e.g. for use with asyncio.gather)?

I am looking for a 1 - 3 line snippet showing how to connect these dots.


Solution

  • According your code snippet, you're trying to pass an async generator function (iterable) directly to asyncio.gather, however, it expects awaitables (coroutines, Tasks, or Futures). So, to fix the issue, one possible solution is, to create a new coroutine that consumes the async iterable()s and collects their items in the lists.

    The code will be like this:

    async def collect(async_iterable):
        return [item async for item in async_iterable]
    
    results = asyncio.run(asyncio.gather(
        asyncio.create_task(collect(iterable())),
        asyncio.create_task(collect(iterable())),
        asyncio.create_task(collect(iterable()))
    ))
    

    In this way, you would have three [1,2,3] lists, but still, there is a problem — you haven't gone through a concurrency manner among the iterable()s. I mean by adding a print within the collect() you see they are running sequentially. Therefore, to have an asynchronous behaviour between your iterables you need at least an await statement within the generator to make it as an awaitable coroutine used for switching. Here we can use await asyncio.sleep(0) as a trick!

    Here's the whole code:

    import asyncio
    from collections.abc import AsyncIterable
    
    async def iterable() -> AsyncIterable[int]:
        yield 1
        await asyncio.sleep(0)
        yield 2
        await asyncio.sleep(0)
        yield 3
    
    async def collect(async_iterable: AsyncIterable[int]) -> list:
        result = []
        async for item in async_iterable:
            print(item)
            result.append(item)
        return result
    
    async def main():
        tasks = [
            asyncio.create_task(collect(iterable())),
            asyncio.create_task(collect(iterable())),
            asyncio.create_task(collect(iterable()))
        ]
        results = await asyncio.gather(*tasks)
        return results
    
    results_1, results_2, results_3 = asyncio.run(main())
    print(results_1, results_2, results_3)
    

    Out:

    1
    1
    1
    2
    2
    2
    3
    3
    3
    [1, 2, 3] [1, 2, 3] [1, 2, 3]