Search code examples
pythonpython-3.xpython-asyncio

Does asyncio.as_completed yield Futures or coroutines?


From the asyncio docs:

asyncio.as_completed(aws, *, loop=None, timeout=None)

Run awaitable objects in the aws set concurrently. Return an iterator of Future objects. Each Future object returned represents the earliest result from the set of the remaining awaitables.

I would assume each of these Future objects has the methods described in asyncio.Future: .cancelled(), .exception(), and .result(). But it appears that the yielded elements are just coroutines, not Future objects. What am I missing?

This seems to defeat the description of .as_completed(). How is the coroutine "completed" if I need to await it?

>>> import asyncio
>>> import aiohttp
>>> 
>>> async def get(session, url):
...     async with session.request('GET', url) as resp:
...         t = await resp.text()
...         return t
... 
>>> async def bulk_as_completed(urls):
...     async with aiohttp.ClientSession() as session:
...         aws = [get(session, url) for url in urls]
...         for future in asyncio.as_completed(aws):
...             for i in ('cancelled', 'exception', 'result'):
...                 print(hasattr(future, i))
...             print(type(future))
...             try:
...                 result = await future
...             except:
...                 pass
...             else:
...                 print(type(result))
...                 print()
... 
>>> 
>>> urls = (
...     'https://docs.python.org/3/library/asyncio-task.html',
...     'https://docs.python.org/3/library/select.html',
...     'https://docs.python.org/3/library/this-page-will-404.html',
... )
>>> 
>>> asyncio.run(bulk_as_completed(urls))
False
False
False
<class 'coroutine'>
<class 'str'>

False
False
False
<class 'coroutine'>
<class 'str'>

False
False
False
<class 'coroutine'>
<class 'str'>

Ultimately, the reason I care about this is because I'd like to let exceptions bubble up as they do in asyncio.gather(..., return_exceptions=True). Consider adding on one bogus URL that will raise when session.request() is called:

urls = (
    'https://docs.python.org/3/library/asyncio-task.html',
    'https://docs.python.org/3/library/select.html',
    'https://docs.python.org/3/library/this-page-will-404.html',

    # This URL will raise on session.request().  How can I propagate
    # that exception to the iterator of results?
    'https://asdfasdfasdf-does-not-exist-asdfasdfasdf.com'
)

What I would like to be able to do is something like this (using the methods of a Future object, but these aren't Future objects at all, which is the problem):

async def bulk_as_completed(urls):
    async with aiohttp.ClientSession() as session:
        aws = [get(session, url) for url in urls]
        for future in asyncio.as_completed(aws):
            if future.cancelled():
                res = futures.CancelledError()
            else:
                exc = future.exception()
                if exc is not None:
                    res = exc
                else:
                    res = future.result()
            # ...
            # [Do something with `res`]

Solution

  • In Python 3.13, as_completed was improved to expose an async iterator interface. As a result, bulk_as_completed should work almost exactly as written in the question, except it should use async for instead of for.

    For earlier Python versions, the remainder of the answer applies.


    What I would like to be able to do is something like this [...]

    Perhaps not quite as convenient, but you should be able to extract the exception with code like this:

    async def bulk_as_completed(urls):
        async with aiohttp.ClientSession() as session:
            aws = [get(session, url) for url in urls]
            for future in asyncio.as_completed(aws):
                try:
                    res = await future
                except Exception as e:
                    res = e
                # ...
                # [Do something with `res`]
    

    This [yielding coroutines rather than futures] seems to defeat the description of .as_completed(). How is the coroutine "completed" if I need to await it?

    In short, it's not completed when you get it, but it is something that you can await to get the first completed future.

    When asyncio.as_completed was first implemented, asynchronous iterators didn't exist. Without asynchronous iteration there was no way for a for loop to produce futures as they complete, so as_completed faked it by (immediately) yielding intermediate awaitables which one has to await to get the actual result.

    Even if as_completed yielded actual futures, it wouldn't help with your use case because those futures wouldn't complete without someone awaiting them. To provide the expected semantics of as_completed yielding completed futures, as_completed needs to implement asynchronous iteration, whose equivalent of __next__ can await.

    The surprising behavior of as_completed has been brought up before, and I have already filed an issue to fix it by providing async iteration. Once it is implemented, your original code will work with just the for changed to async for.