I want to run some function in the background, but be able to shut it down at will. I tried a bunch of solutions, the most obvious of which is asyncio
tasks:
# Data source
DataSource = Generator[int, Any, None]
def create_data_source() -> DataSource
...
# Data processing - should be able to run in the background until being shut down
async def data_processor(source: DataSource) -> None:
try:
for item in source:
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print("Cancelled")
async def main():
# Run the function
data_source = create_data_source()
processing_task = asyncio.create_task(data_processor(data_source))
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
processing_task.cancel()
asyncio.run(main())
The problem is - once the task starts, the "Working" loop never terminates. I tried making the loop async, I tried sending it a shutdown signal using asyncio.Event
, and I even tried rewriting the function using concurrent.futures
, but nothing works. What am I missing?
Note:
Your data_processing
coroutine, which is not issuing any await
statements seems to be a poor candidate for being an async function. You should consider running it as a sync function using run_in_executor
using either a thread pool (the default) or a multiprocessing pool created with concurrent.futures.ProcessPoolExecutor
according to its CPU usage. For example:
Using a Multithreading Pool
import asyncio
import time
from threading import Event
# Data processing - should be able to run in the background until being shut down
def data_processor(stop_event) -> None:
start_time = time.time()
try:
while True:
if stop_event.is_set():
break
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print(f"Started at {start_time}, ended at {time.time()}")
async def main():
stop_event = Event()
# We don't need a larger pool:
loop = asyncio.get_running_loop()
awaitable = loop.run_in_executor(None, data_processor, stop_event)
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
stop_event.set()
await awaitable # Wait for task to complete
if __name__ == '__main__':
asyncio.run(main())
Prints:
Working
Working
...
Working
Started at 1738589602.7931993, ended at 1738589605.79333
Using a Multiprocessing Pool
Here we cannot directly pass a multiprocessing.Event
instance to the worker function and must instead use a pool initializer to initialize each pool process with a global reference to the event:
import asyncio
import time
from multiprocessing import Event
from concurrent.futures import ProcessPoolExecutor
def init_pool(_stop_event: Event) -> None:
global stop_event
stop_event = _stop_event
# Data processing - should be able to run in the background until being shut down
def data_processor() -> None:
start_time = time.time()
try:
while True:
if stop_event.is_set():
break
print("Working")
except Exception as e:
print("Exception", e)
raise
finally:
print(f"Started at {start_time}, ended at {time.time()}")
async def main():
stop_event = Event()
# We don't need a larger pool:
executor = ProcessPoolExecutor(1, initializer=init_pool, initargs=(stop_event,))
loop = asyncio.get_running_loop()
awaitable = loop.run_in_executor(executor, data_processor)
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
stop_event.set()
await awaitable # Wait for task to complete
if __name__ == '__main__':
asyncio.run(main())
Easy, But Perhaps Not the Best Solution
Last and least, you could just insert calls to await asyncio.sleep(0)
to give othe async tasks a chance to run:
import asyncio
import time
# Data processing - should be able to run in the background until being shut down
async def data_processor() -> None:
start_time = time.time()
try:
while True:
print("Working")
await asyncio.sleep(0)
except Exception as e:
print("Exception", e)
raise
finally:
print(f"Started at {start_time}, ended at {time.time()}")
async def main():
processing_task = asyncio.create_task(data_processor())
# Do other stuff
await asyncio.sleep(3)
# Shut the function down
processing_task.cancel()
asyncio.run(main())