Search code examples
pythonpython-3.xmultithreadingasynchronouspython-asyncio

Is it safe to create asyncio event loop in one thread and run it in another thread while having ability to cancel it from outside the thread (python3)


I want to know if it's safe to create an asyncio event loop in one thread and run the loop in another while having the ability to cancel it from outside the thread in which the event loop is running. I want to run an async function in a thread while also having ability to cancel it from outside the thread.

Here is the code I am currently using. I want to know if this is the correct approach or if there is a simpler/better way to do this:

import asyncio
from concurrent.futures import ThreadPoolExecutor, CancelledError
import time

pool = ThreadPoolExecutor()
loop = asyncio.new_event_loop()


def start_event_loop(loop):
    asyncio.set_event_loop(loop)  # is this necessary?
    loop.run_forever()


pool.submit(start_event_loop, loop)


async def long_task():
    try:
        while True:
            print("Running...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")


future = asyncio.run_coroutine_threadsafe(long_task(), loop)

time.sleep(5)
loop.call_soon_threadsafe(future.cancel)

try:
    future.result()
except CancelledError:
    time.sleep(1)
    print("Future was cancelled")

loop.call_soon_threadsafe(loop.stop)  # can i reuse the loop?

while loop.is_running():
    print("Wating for loop to stop...")
    time.sleep(0.1)

loop.close() # is this necessary?
print("Loop stopped")

This is based on this article and this question.

Also, there are few more questions I'd like to ask:

  1. Is it necessary to use asyncio.set_event_loop? Both of them produce the same output.

  2. Is it okay to reuse the event loop after stopping it or should I create a new loop each time after stopping the loop?

  3. Is it necessary to close the loop after you are done with it?


Solution

  • No, you do not need to call asyncio.set_event_loop nor do you need to call loop.close().

    But the comment # can i reuse the loop? after you have stopped the loop is a bit baffling to me. Yes, you could reuse the loop after calling close but how would you reuse the loop in a way that is different than the way it is currently being used? I would simply not stop the loop at all. If it has no work to do then the thread it is running in should not be using any CPU cycles. I would just leave the loop "running" (albeit idle) so if you should again need to run a task in a thread other than the main one, you already the necessary running loop.

    In fact, the code could be simplified to:

    import asyncio
    from concurrent.futures import CancelledError
    import time
    import threading
    
    loop = asyncio.new_event_loop()
    threading.Thread(target=loop.run_forever, daemon=True).start()
    
    async def long_task():
        try:
            while True:
                print("Running...")
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print("Task was cancelled")
    
    
    future = asyncio.run_coroutine_threadsafe(long_task(), loop)
    
    time.sleep(5)
    loop.call_soon_threadsafe(future.cancel)
    
    try:
        future.result()
    except CancelledError:
        # Why is this call to time.sleep being made?
        #time.sleep(1)
        print("Future was cancelled")
    

    Prints:

    Running...
    Running...
    Running...
    Running...
    Running...
    Future was cancelled
    Task was cancelled
    

    Update

    First, after testing the above code on Linux (I had been testing only under Windows), it now seems that you do need to cancel a future from the thread that is running the event loop by using for example loop.call_soon_threadsafe(future.cancel).

    If for some reason you need to close the loop, this has to be done after the loop has been stopped and the original invocation of asyncio.loop.run_forever() in the child thread has returned. To be completely safe, close should be called from the same thread in which the loop had been running. This is because close attempts to call shutdown on any default executor that may have been started in response to a call to asyncio.loop.run_in_executor(None, some_function) and this may hang if called from a different thread.

    So now we need a new runner function that calls asyncio.loop.run_forever() and then call asyncio.loop.close(). Here are the changes we need to make:

    1. Save a reference to the child thread that is running the event loop. The child thread no longer needs to be a daemon thread since it will be terminating. In this thread we run worker function runner that will call asyncio.loop.run_forever followed by asyncio.loop.close. The asyncio.loop.run_forever call will terminate eventually in response to asyncio.loop.stop being called. Since the stop method only sets a flag to tell the loop to stop running, you should be able to call this function from any thread. But I have had an issue where the code hangs under Linux. So you should call stop in the thread running the event loop with loop.call_soon_threadsafe(loop.stop).
    2. After the call to close has completed, the child thread will terminate. We will know that the close has completed by joining the child thread.
    import asyncio
    from concurrent.futures import CancelledError
    import time
    import threading
    
    def runner(loop):
        loop.run_forever()
        loop.close()
    
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=runner, args=(loop,))
    t.start()
    
    def worker():
        print('worker starting')
        time.sleep(1)
        print('worker ending')
    
    async def long_task():
        try:
            await asyncio.get_running_loop().run_in_executor(None, worker)
            while True:
                print("Running...")
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print("Task was cancelled")
    
    
    future = asyncio.run_coroutine_threadsafe(long_task(), loop)
    
    time.sleep(5)
    loop.call_soon_threadsafe(future.cancel)
    
    try:
        future.result()
    except CancelledError:
        print("Future was cancelled")
    
    # Now there are no tasks running in the event loop that need awaiting
    # Stop the event loop (call stop in the thread running the loop):
    loop.call_soon_threadsafe(loop.stop)
    t.join()
    # The loop should be closed now:
    print(loop)
    

    Prints:

    worker starting
    worker ending
    Running...
    Running...
    Running...
    Running...
    Future was cancelled
    Task was cancelled
    <ProactorEventLoop running=False closed=True debug=False>