Search code examples
pythonpython-asyncioconcurrent.futures

How to properly use concurrent.futures with asyncio


I am prototyping a FastAPI app with an endpoint that will launch long-running process using subprocess module. The obvious solution is to use concurrent.futures and ProcessPoolExecutor, however I am unable to get the behavior I want. Code sample:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import subprocess as sb
import time
import random

pool = ProcessPoolExecutor(5)

def long_task(s):
    print("started")
    time.sleep(random.randrange(5, 15))
    sb.check_output(["touch", str(s)])
    print("done")

async def async_task():
    loop = asyncio.get_event_loop()
    print("started")
    tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
    while True:
        print("in async task")
        done, _ = await asyncio.wait(tasks, timeout=1)
        for task in done:
            await task
        await asyncio.sleep(1)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_task())


if __name__ == "__main__":
    main()

This sample works fine, on the surface, but spawned processes do not get stopped after execution completes - I see all of python processes in ps aux | grep python. Shouldn't awaiting completed task stop it? In the end I do not care much about the result of the execution, it just should happen in the background and exit cleanly - without any hanging processes.


Solution

  • You must close the ProcessPool when you are done using it, either by explicitly calling its shutdown() method, or using it in a ContextManager. I used the ContextManager approach.

    I don't know what subprocess.check_output does, so I commented it out.

    I also replaced your infinite loop with a single call to asyncio.gather, which will yield until the Executor is finished.

    I'm on Windows, so to observe the creation/deletion of Processes I watched the Windows Task Manager. The program creates 5 subprocesses and closes them again when the ProcessPool context manager exits.

    import asyncio
    from concurrent.futures import ProcessPoolExecutor
    # import subprocess as sb
    import time
    import random
    
    def long_task(s):
        print("started")
        time.sleep(random.randrange(5, 15))
        # sb.check_output(["touch", str(s)])
        print("done", s)
    
    async def async_task():
        loop = asyncio.get_event_loop()
        print("started")
        with ProcessPoolExecutor(5) as pool:
            tasks = [loop.run_in_executor(pool, long_task, i) for i in range(10)]
            await asyncio.gather(*tasks)
        print("Completely done")
    
    def main():
        asyncio.run(async_task())
    
    if __name__ == "__main__":
        main()