Search code examples
pythonpython-3.xpython-3.4python-asyncio

Please explain "Task was destroyed but it is pending!" after cancelling tasks


I am learning asyncio with Python 3.4.2 and I use it to continuously listen on an IPC bus, while gbulb listens on the DBus.

I created a function listen_to_ipc_channel_layer that continuously listens for incoming messages on the IPC channel and passes the message to message_handler.

I am also listening to SIGTERM and SIGINT. When I send a SIGTERM to the python process running the code you find at the bottom, the script should terminate gracefully.

The problem I am having is the following warning:

got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>

Process finished with exit code 0

…with the following code:

import asyncio
import gbulb
import signal
import asgi_ipc as asgi

def main():
    asyncio.async(listen_to_ipc_channel_layer())
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    # Start listening on the Linux IPC bus for incoming messages
    loop.run_forever()
    loop.close()

@asyncio.coroutine
def listen_to_ipc_channel_layer():
    """Listens to the Linux IPC bus for messages"""
    while True:
        message_handler(message=channel_layer.receive(["my_channel"]))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break

def ask_exit():
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()


if __name__ == "__main__":
    gbulb.install()
    # Connect to the IPC bus
    channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
    main()

I still only understand very little of asyncio, but I think I know what is going on. While waiting for yield from asyncio.sleep(0.1) the signal handler caught the SIGTERM and in that process it calls task.cancel().

Shouldn't this trigger the CancelledError within the while True: loop? (Because it is not, but that is how I understand "Calling cancel() will throw a CancelledError to the wrapped coroutine").

Eventually loop.stop() is called which stops the loop without waiting for either yield from asyncio.sleep(0.1) to return a result or even the whole coroutine listen_to_ipc_channel_layer.

Please correct me if I am wrong.

I think the only thing I need to do is to make my program wait for the yield from asyncio.sleep(0.1) to return a result and/or coroutine to break out the while loop and finish.

I believe I confuse a lot of things. Please help me get those things straight so that I can figure out how to gracefully close the event loop without warning.


Solution

  • The problem comes from closing the loop immediately after cancelling the tasks. As the cancel() docs state

    "This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop."

    Take this snippet of code:

    import asyncio
    import signal
    
    
    async def pending_doom():
        await asyncio.sleep(2)
        print(">> Cancelling tasks now")
        for task in asyncio.Task.all_tasks():
            task.cancel()
    
        print(">> Done cancelling tasks")
        asyncio.get_event_loop().stop()
    
    
    def ask_exit():
        for task in asyncio.Task.all_tasks():
            task.cancel()
    
    
    async def looping_coro():
        print("Executing coroutine")
        while True:
            try:
                await asyncio.sleep(0.25)
            except asyncio.CancelledError:
                print("Got CancelledError")
                break
    
            print("Done waiting")
    
        print("Done executing coroutine")
        asyncio.get_event_loop().stop()
    
    
    def main():
        asyncio.async(pending_doom())
        asyncio.async(looping_coro())
    
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, ask_exit)
    
        loop.run_forever()
    
        # I had to manually remove the handlers to
        # avoid an exception on BaseEventLoop.__del__
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.remove_signal_handler(sig)
    
    
    if __name__ == '__main__':
        main()
    

    Notice ask_exit cancels the tasks but does not stop the loop, on the next cycle looping_coro() stops it. The output if you cancel it is:

    Executing coroutine
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    ^CGot CancelledError
    Done executing coroutine
    

    Notice how pending_doom cancels and stops the loop immediately after. If you let it run until the pending_doom coroutines awakes from the sleep you can see the same warning you're getting:

    Executing coroutine
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    Done waiting
    >> Cancelling tasks now
    >> Done cancelling tasks
    Task was destroyed but it is pending!
    task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>