Search code examples
pythonpython-3.xexceptionpython-asynciocancellation

How to cancel all remaining tasks in gather if one fails?


In case one task of gather raises an exception, the others are still allowed to continue.

Well, that's not exactly what I need. I want to distinguish between errors that are fatal and need to cancel all remaining tasks, and errors that are not and instead should be logged while allowing other tasks to continue.

Here is my failed attempt to implement this:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())

(the finally await sleep here is to prevent the interpreter from closing immediately, which would on its own cancel all tasks)

Oddly, calling cancel on the gather does not actually cancel it!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.

I am very surprised by this behavior since it seems to be contradictory to the documentation, which states:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

Return a future aggregating results from the given coroutine objects or futures.

(...)

Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled. (...)

What am I missing here? How to cancel the remaining tasks?


Solution

  • The problem with your implementation is that it calls sleepers.cancel() after sleepers has already raised. Technically the future returned by gather() is in a completed state, so its cancellation must be no-op.

    To correct the code, you just need to cancel the children yourself instead of trusting gather's future to do it. Of course, coroutines are not themselves cancelable, so you need to convert them to tasks first (which gather would do anyway, so you're doing no extra work). For example:

    async def main():
        tasks = [asyncio.ensure_future(my_sleep(secs))
                 for secs in [2, 5, 7]]
        try:
            await asyncio.gather(*tasks)
        except ErrorThatShouldCancelOtherTasks:
            print('Fatal error; cancelling')
            for t in tasks:
                t.cancel()
        finally:
            await sleep(5)
    

    I am very surprised by this behavior since it seems to be contradictory to the documentation[...]

    The stumbling block with gather is that it doesn't really run tasks to completion. It's just a helper that submits them to run and waits for them to finish, and the waiting is its primary mission. This is why gather doesn't cancel the remaining tasks if some of them fails with an exception - it just abandons the wait, and propagates the exception, leaving the remaining tasks to proceed in the background. This was reported as a bug, but wasn't fixed for backward compatibility and because the behavior is documented and unchanged from the beginning.

    And now we get to the bigger wart: the documentation explicitly promises being able to cancel the returned future. Your code does exactly that and that doesn't work, without it being obvious why (at least it took me a while to figure it out, and required reading the source). It turns out that the contract of Future actually prevents this from working. By the time you call cancel(), the future returned by gather has already completed, and cancelling a completed future is meaningless, it is just no-op. (The reason is that a completed future has a well-defined result that could have been observed by outside code. Cancelling it would change its result, which is not allowed.)

    In other words, the documentation is not wrong, because canceling would have worked if you had performed it prior to await sleepers having completed. However, it's misleading, because it appears to allow canceling gather() in this important use case of one of its awaitable raising, but in reality doesn't. Problems like this that pop up when using gather are reason why many people eagerly await (no pun intended) trio-style nurseries in asyncio (edit: added many years later in Python 3.11.)