Search code examples
pythonpython-asyncio

Why can all() not accept an async generator?


Fairly new to async things in Python and trying to understand it more deeply and ran into this seeming inconsistency I'm trying to understand.

Given this setup:

async def return_true():  # In my actual use case this executes a DB query that returns True/False
    return True

async def async_range(count):  # In my actual use case this is an iterator as the result of a DB query that may stream results
    for i in range(count):
        yield(i)
        await asyncio.sleep(0.0)

When I run this:

any(await return_true() async for i in async_range(10))

I get this error:

TypeError: 'async_generator' object is not iterable

When I change it to any([await return_true() async for i in async_range(10)]) it runs without issue but in my non-toy example means I have to wait for all the DB queries to return when I might not actually care about all of them since the first one to return True means the any() will return True.

So, my question is, is this expected? Is this just something that Python hasn't gotten around to implementing in an async compatible method yet (are there plans to do that?) or is there a separate library I should be using that implements these built-ins in async compatible ways?

P.S. I did find this answer that seems to implement the behavior I'm looking for, but does so in a much more verbose way.


Solution

  • Different protocols

    Why can any() not accept an async generator?

    The superficial answer is frustratingly simple: Because it is the wrong type.

    any accepts an iterable, which means an object implementing the __iter__ method. That is literally the only requirement for that protocol. In layman's terms, you must pass something to any that would also "work in a for-loop".

    But the expression

    (element async for element in async_iterable)
    

    returns an asynchronous generator, which is a subtype of the asynchronous iterator, which in turn is a subtype of the asynchronous iterable. It is not a subtype of the "normal" iterable. It does not have the __iter__ method. It does not work in a for-loop.

    Try to jam an async_generator object into a for-loop and you'll get that exact TypeError you already saw:

    TypeError: 'async_generator' object is not iterable
    

    You are dealing with a different protocol that requires a different setup. If you want a function that supports asynchronous iterables, it would necessarily have to return a coroutine itself (i.e. you would have to await it). PEP 492 tells us:

    It is a SyntaxError to use async for outside of an async def function.


    Side note

    I think it is worth mentioning here that asynchronous generators are never concurrent in their own iteration; that would not make much sense. They can be concurrent with other coroutines in the application. That is their purpose.

    This means that the items yielded by something like your async_range are still always yielded sequentially. It is just that you can run that iteration concurrently with other coroutines and the event loop may switch contexts to those other coroutines in between each iteration.

    It helps to remember what async for actually represents.

    This

    async for element in async_iterable:
        ... # do something with `element`
    

    is semantically equivalent to this:

    async_iterator = async_iterable.__aiter__()
    running = True
    while running:
        try:
            element = await async_iterator.__anext__()
        except StopAsyncIteration:
            running = False
        else:
            ... # do something with `element`
    

    This means you always await one item after the other, deterministically, sequentially.

    I thought it important to mention this because from the way you phrased your question it seemed as though you thought you could somehow run all the coroutines corresponding to your generator items concurrently. You can not because in your example they are not independent of one another. The next element from the async iterator is only yielded after the previous has been awaited.


    DIY any solution

    It is fairly straightforward to implement an asynchronous counterpart to any yourself.

    Remember that the any function is semantically equivalent to this:

    from collections.abc import Iterable
    
    
    def any(iterable: Iterable[object]) -> bool:
        for element in iterable:
            if element:
                return True
        return False
    

    In words: "Keep grabbing one element after another from the iterable, until one of them is truthy."

    The async version would look like this:

    from collections.abc import AsyncIterable
    
    
    async def async_any(async_iterable: AsyncIterable[object]) -> bool:
        async for element in async_iterable:
            if element:
                return True
        return False
    

    So if you want, you can replace the built-in any with your own overloaded function that works on both asynchronous and non-asynchronous iterables like this:

    import builtins
    from collections.abc import AsyncIterable, Awaitable, Iterable
    from typing import Union, overload
    
    
    async def async_any(async_iterable: AsyncIterable[object]) -> bool:
        async for element in async_iterable:
            if element:
                return True
        return False
    
    
    @overload
    def any(iterable: AsyncIterable[object]) -> Awaitable[bool]: ...
    
    
    @overload
    def any(iterable: Iterable[object]) -> bool: ...
    
    
    def any(
        iterable: Union[AsyncIterable[object], Iterable[object]]
    ) -> Union[Awaitable[bool], bool]:
        if isinstance(iterable, AsyncIterable):
            return async_any(iterable)
        return builtins.any(iterable)
    

    This will obviously still necessitate awaiting the coroutine returned in case you pass an asynchronous iterable to it. But you could now at least call it either way.

    Demo: (with the custom any)

    from asyncio import run, sleep
    from collections.abc import AsyncIterator
    
    
    async def async_range(count: int) -> AsyncIterator[int]:
        for i in range(count):
            print("yielding", i)
            yield i
            await sleep(0.1)
    
    
    async def main() -> None:
        if await any(i >= 2 async for i in async_range(5)):
            print("At least one greater or equal to 2")
        else:
            print("All less than 2")
    
        if any(i >= 5 for i in range(5)):
            print("At least one greater or equal to 5")
        else:
            print("All less than 5")
    
    
    run(main())
    

    Output:

    yielding 0
    yielding 1
    yielding 2
    At least one greater or equal to 2
    All less than 5
    

    I am not sure, if this is a good idea, but as you can see it is possible. The danger I see here is that you could forget to await the asynchronous version and the coroutine object returned is always truthy. But you would at least get a warning from the interpreter about never having awaited it.

    As to why this not supported out of the box and whether there even are plans for it, I don't know.

    I found a library called asyncstdlib that aims to fill this gap. Its async-any implementation is different in principle from what I showed above in that it always returns a coroutine because it wraps regular iterables in asynchronous ones internally.


    If you are interested in more details about all the protocols involved here, you may want to check out this question:

    In Python, what is the difference between `async for x in async_iterator` and `for x in await async_iterator`?