I want to know if it's safe to create an asyncio event loop in one thread and run the loop in another while having the ability to cancel it from outside the thread in which the event loop is running. I want to run an async function in a thread while also having ability to cancel it from outside the thread.
Here is the code I am currently using. I want to know if this is the correct approach or if there is a simpler/better way to do this:
import asyncio
from concurrent.futures import ThreadPoolExecutor, CancelledError
import time
pool = ThreadPoolExecutor()
loop = asyncio.new_event_loop()
def start_event_loop(loop):
asyncio.set_event_loop(loop) # is this necessary?
loop.run_forever()
pool.submit(start_event_loop, loop)
async def long_task():
try:
while True:
print("Running...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
future = asyncio.run_coroutine_threadsafe(long_task(), loop)
time.sleep(5)
loop.call_soon_threadsafe(future.cancel)
try:
future.result()
except CancelledError:
time.sleep(1)
print("Future was cancelled")
loop.call_soon_threadsafe(loop.stop) # can i reuse the loop?
while loop.is_running():
print("Wating for loop to stop...")
time.sleep(0.1)
loop.close() # is this necessary?
print("Loop stopped")
This is based on this article and this question.
Also, there are few more questions I'd like to ask:
Is it necessary to use asyncio.set_event_loop
? Both of them produce the same output.
Is it okay to reuse the event loop after stopping it or should I create a new loop each time after stopping the loop?
Is it necessary to close the loop after you are done with it?
No, you do not need to call asyncio.set_event_loop
nor do you need to call loop.close()
.
But the comment # can i reuse the loop?
after you have stopped the loop is a bit baffling to me. Yes, you could reuse the loop after calling close
but how would you reuse the loop in a way that is different than the way it is currently being used? I would simply not stop the loop at all. If it has no work to do then the thread it is running in should not be using any CPU cycles. I would just leave the loop "running" (albeit idle) so if you should again need to run a task in a thread other than the main one, you already the necessary running loop.
In fact, the code could be simplified to:
import asyncio
from concurrent.futures import CancelledError
import time
import threading
loop = asyncio.new_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()
async def long_task():
try:
while True:
print("Running...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
future = asyncio.run_coroutine_threadsafe(long_task(), loop)
time.sleep(5)
loop.call_soon_threadsafe(future.cancel)
try:
future.result()
except CancelledError:
# Why is this call to time.sleep being made?
#time.sleep(1)
print("Future was cancelled")
Prints:
Running...
Running...
Running...
Running...
Running...
Future was cancelled
Task was cancelled
Update
First, after testing the above code on Linux (I had been testing only under Windows), it now seems that you do need to cancel a future from the thread that is running the event loop by using for example loop.call_soon_threadsafe(future.cancel)
.
If for some reason you need to close the loop, this has to be done after the loop has been stopped and the original invocation of asyncio.loop.run_forever()
in the child thread has returned. To be completely safe, close
should be called from the same thread in which the loop had been running. This is because close
attempts to call shutdown
on any default executor that may have been started in response to a call to asyncio.loop.run_in_executor(None, some_function)
and this may hang if called from a different thread.
So now we need a new runner
function that calls asyncio.loop.run_forever()
and then call asyncio.loop.close()
. Here are the changes we need to make:
runner
that will call asyncio.loop.run_forever
followed by asyncio.loop.close
. The asyncio.loop.run_forever
call will terminate eventually in response to asyncio.loop.stop
being called. Since the stop
method only sets a flag to tell the loop to stop running, you should be able to call this function from any thread. But I have had an issue where the code hangs under Linux. So you should call stop
in the thread running the event loop with loop.call_soon_threadsafe(loop.stop)
.close
has completed, the child thread will terminate. We will know that the close has completed by joining the child thread.import asyncio
from concurrent.futures import CancelledError
import time
import threading
def runner(loop):
loop.run_forever()
loop.close()
loop = asyncio.new_event_loop()
t = threading.Thread(target=runner, args=(loop,))
t.start()
def worker():
print('worker starting')
time.sleep(1)
print('worker ending')
async def long_task():
try:
await asyncio.get_running_loop().run_in_executor(None, worker)
while True:
print("Running...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
future = asyncio.run_coroutine_threadsafe(long_task(), loop)
time.sleep(5)
loop.call_soon_threadsafe(future.cancel)
try:
future.result()
except CancelledError:
print("Future was cancelled")
# Now there are no tasks running in the event loop that need awaiting
# Stop the event loop (call stop in the thread running the loop):
loop.call_soon_threadsafe(loop.stop)
t.join()
# The loop should be closed now:
print(loop)
Prints:
worker starting
worker ending
Running...
Running...
Running...
Running...
Future was cancelled
Task was cancelled
<ProactorEventLoop running=False closed=True debug=False>