Search code examples
pythonpython-3.xasync-awaitconcurrencypython-asyncio

Creating a stoppable background task in Python


I want to run some function in the background, but be able to shut it down at will. I tried a bunch of solutions, the most obvious of which is asyncio tasks:

# Data source
DataSource = Generator[int, Any, None]
def create_data_source() -> DataSource
   ...

# Data processing - should be able to run in the background until being shut down
async def data_processor(source: DataSource) -> None:
    try:
        for item in source:
            print("Working")

    except Exception as e:
        print("Exception", e)
        raise

    finally:
        print("Cancelled")

async def main():
    # Run the function
    data_source = create_data_source()
    processing_task = asyncio.create_task(data_processor(data_source))

    # Do other stuff
    await asyncio.sleep(3)

    # Shut the function down
    processing_task.cancel()

asyncio.run(main())

The problem is - once the task starts, the "Working" loop never terminates. I tried making the loop async, I tried sending it a shutdown signal using asyncio.Event, and I even tried rewriting the function using concurrent.futures, but nothing works. What am I missing?

Note:

  1. I searched through previous Q&As, but haven't found a solution.
  2. This is running on Python 3.11 for dependencies reasons, but if newer versions have the solution then I might be able to update.

Solution

  • Your data_processing coroutine, which is not issuing any await statements seems to be a poor candidate for being an async function. You should consider running it as a sync function using run_in_executor using either a thread pool (the default) or a multiprocessing pool created with concurrent.futures.ProcessPoolExecutor according to its CPU usage. For example:

    Using a Multithreading Pool

    import asyncio
    import time
    from threading import Event
    
    # Data processing - should be able to run in the background until being shut down
    def data_processor(stop_event) -> None:
        start_time = time.time()
        try:
            while True:
                if stop_event.is_set():
                    break
                print("Working")
        except Exception as e:
            print("Exception", e)
            raise
        finally:
            print(f"Started at {start_time}, ended at {time.time()}")
    
    async def main():
        stop_event = Event()
        # We don't need a larger pool:
        loop = asyncio.get_running_loop()
        awaitable = loop.run_in_executor(None, data_processor, stop_event)
    
        # Do other stuff
        await asyncio.sleep(3)
    
        # Shut the function down
        stop_event.set()
        await awaitable  # Wait for task to complete
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    Prints:

    Working
    Working
    ...
    Working
    Started at 1738589602.7931993, ended at 1738589605.79333
    

    Using a Multiprocessing Pool

    Here we cannot directly pass a multiprocessing.Event instance to the worker function and must instead use a pool initializer to initialize each pool process with a global reference to the event:

    import asyncio
    import time
    from multiprocessing import Event
    from concurrent.futures import ProcessPoolExecutor
    
    def init_pool(_stop_event: Event) -> None:
        global stop_event
    
        stop_event = _stop_event
    
    # Data processing - should be able to run in the background until being shut down
    def data_processor() -> None:
        start_time = time.time()
        try:
            while True:
                if stop_event.is_set():
                    break
                print("Working")
        except Exception as e:
            print("Exception", e)
            raise
        finally:
            print(f"Started at {start_time}, ended at {time.time()}")
    
    async def main():
        stop_event = Event()
        # We don't need a larger pool:
        executor = ProcessPoolExecutor(1, initializer=init_pool, initargs=(stop_event,))
        loop = asyncio.get_running_loop()
        awaitable = loop.run_in_executor(executor, data_processor)
    
        # Do other stuff
        await asyncio.sleep(3)
    
        # Shut the function down
        stop_event.set()
        await awaitable  # Wait for task to complete
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    Easy, But Perhaps Not the Best Solution

    Last and least, you could just insert calls to await asyncio.sleep(0) to give othe async tasks a chance to run:

    import asyncio
    import time
    
    # Data processing - should be able to run in the background until being shut down
    async def data_processor() -> None:
        start_time = time.time()
        try:
            while True:
                print("Working")
                await asyncio.sleep(0)
        except Exception as e:
            print("Exception", e)
            raise
        finally:
            print(f"Started at {start_time}, ended at {time.time()}")
    
    async def main():
        processing_task = asyncio.create_task(data_processor())
    
        # Do other stuff
        await asyncio.sleep(3)
    
        # Shut the function down
        processing_task.cancel()
    
    asyncio.run(main())