Search code examples
pythonpython-3.xtaskpython-asyncio

CancelledError exception handler not triggering


My exception handler for tasks being cancelled seems to trigger for simple cases, but not when using multiple tasks or asyncio.gather.

Sample code:

import asyncio


async def sleep_func(statement, time, sabotage=False):
    print(f"Starting: {statement}")
    try:
        if sabotage:
            tasks = [
                asyncio.sleep(1000),
                asyncio.sleep(1000),
                asyncio.sleep(1000),
            ]
            await asyncio.gather(*tasks)
        await asyncio.sleep(time)
    except asyncio.CancelledError as e:
        print(f"cancelled {statement}! - {str(e)}")
    except Exception as e:
        print(f"Unhandled exception - {str(e)}")
    print(f"Ending: {statement}")


async def main():
    calls = [
        asyncio.ensure_future(sleep_func("eat", 3)),
        asyncio.ensure_future(sleep_func("pray", 8)),
        asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
    ]
    print("starting!")
    finished, unfinished = await asyncio.wait(calls, timeout=6)
    for task in unfinished:
        task.cancel("This message should be shown when task is cancelled")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

I have three tasks in my main function:

[
    asyncio.ensure_future(sleep_func("eat", 3)),
    asyncio.ensure_future(sleep_func("pray", 8)),
    asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
]

Expected:

  • My first task is not cancelled (this works)

  • My second task is cancelled and prints the message in the
    CancelledError handler correctly (this works)

  • My second task is cancelled and prints the message in the
    CancelledError handler correctly (this does NOT work)

Why is the CancelledError handler not triggering for the last task, which uses asyncio.gather and has a bunch of sub-tasks?

Output:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray

Expected output:

starting!
Starting: eat
Starting: pray
Starting: love
Ending: eat
cancelled pray! - This message should be shown when task is cancelled
Ending: pray
cancelled love! - This message should be shown when task is cancelled
Ending: love

Solution

  • The reason you are not seeing the cancellation of the love task is that you are not awaiting it.

    By the time you get to the end of your cancellation for-loop, the tasks have not been cancelled, they are just cancelling, i.e. in the process of being cancelled.

    Calling the Task.cancel method does not immediately cancel the task. To quote the docs (with my own emphasis), it

    arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

    To allow the next event loop cycle to actually commence, you need an await expression. That will allow a context switch to one of the couroutines to happen and the CancelledError to actually be raised there.

    But your main coroutine ends without another await, thus without the possibility of a context switch by the event loop. This is why in the example code snippet under the Task.cancel documentation, the tasks that is to be cancelled is awaited at the end.

    Thus, the simplest way to get your desired output would be to simply add await task right below task.cancel(...) in your for-loop:

    ...
    
    async def main():
        calls = [
            asyncio.ensure_future(sleep_func("eat", 3)),
            asyncio.ensure_future(sleep_func("pray", 8)),
            asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
        ]
        print("starting!")
        finished, unfinished = await asyncio.wait(calls, timeout=6)
        for task in unfinished:
            task.cancel("This message should be shown when task is cancelled")
            await task  # <---
    

    As to why one of the two tasks that you called cancel on actually managed to send the CancelledError to its coroutine, I don't know the internals of the loop's run_until_complete method well enough, but suspect that it does allow for another context switch inside it before returning. But I would speculate that this is just an implementation detail and not reliable at all (as demonstrated by your example), even less so, when the number of cancelled tasks increases.


    As an aside, you should probably not be using the loop = asyncio.get_event_loop() pattern anymore due to its deprecation. The canonical way of running your asynchronous main function is via asyncio.run.

    Also, when you have multiple tasks to be cancelled, the pattern is usually to asyncio.gather them after requesting cancellation.

    Lastly, as mentioned in a comment, it is best practice to re-raise a caught CancelledError in a coroutine. You can then avoid it "further up the chain" in your main function by simply passing return_exceptions=True to asyncio.gather for the unfinished cancelled tasks.

    So the changes I would suggest to your example code would look as follows:

    import asyncio
    
    async def sleep_func(statement, time, sabotage=False):
        print(f"Starting: {statement}")
        try:
            if sabotage:
                tasks = [
                    asyncio.sleep(1000),
                    asyncio.sleep(1000),
                    asyncio.sleep(1000),
                ]
                await asyncio.gather(*tasks)
            await asyncio.sleep(time)
        except asyncio.CancelledError as e:
            print(f"cancelled {statement}! - {str(e)}")
            raise e  # <---
        except Exception as e:
            print(f"Unhandled exception - {str(e)}")
        finally:  # <---
            print(f"Ending: {statement}")
    
    async def main():
        calls = [
            asyncio.ensure_future(sleep_func("eat", 3)),
            asyncio.ensure_future(sleep_func("pray", 8)),
            asyncio.ensure_future(sleep_func("love", 10, sabotage=True)),
        ]
        print("starting!")
        finished, unfinished = await asyncio.wait(calls, timeout=6)
        for task in unfinished:
            task.cancel("This message should be shown when task is cancelled")
        await asyncio.gather(*unfinished, return_exceptions=True)  # <---
    
    if __name__ == "__main__":
        asyncio.run(main())  # <---
    

    Output:

    starting!
    Starting: eat
    Starting: pray
    Starting: love
    Ending: eat
    cancelled pray! - This message should be shown when task is cancelled
    Ending: pray
    cancelled love! - This message should be shown when task is cancelled
    Ending: love
    

    PS: Starting with Python 3.11 we have the asyncio.TaskGroup class that provides a convenient context manager for such cases. Utilizing that we could write the main function like this:

    ...
    
    async def main():
        print("starting!")
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(sleep_func("eat", 3)),
                tg.create_task(sleep_func("pray", 8)),
                tg.create_task(sleep_func("love", 10, sabotage=True)),
            ]
            finished, unfinished = await asyncio.wait(tasks, timeout=6)
            for task in unfinished:
                task.cancel("This message should be shown when task is cancelled")
    

    Notice how do not need to await the tasks ourselves here because

    all tasks are awaited when the context manager exits.

    And we also don't need to worry about the propagated CancelledError because that will not "leak" out of the task group context.