Search code examples
pythonmultithreadingpython-asyncioconcurrent.futures

Executing async methods in a ThreadPoolExecutor


I am new to Python and I am working on an existing python project that does execution of some custom scripts that are inputted by the user. The scripts are run as python subprocess

 await asyncio.create_subprocess_exec(...)

Majority of the methods in my project are defined with async keyword with await on the caller. The problem is that at any point in time, there is only one thread running so essentially I am only able to run single shell script and only when that is complete a new execution starts. I am planning to move the execution of the logic (sequence of such async methods) into a ThreadPoolExecutor or ProcessPoolExecutor

I tried converting all the async def methods to simple methods. But inside a few of these methods, there are some calls to other async methods from third party modules. And it fails with SyntaxError: 'await' outside async function . Looks like any async calls should be made from within async def methods. So I cannot really convert all my methods to def

Any suggestions on what to do to make this project multi-threaded ?


Solution

  • Since I am a new contributor I can not add comment yet so let me reply in answer. @Mohan I think your code is already having parallel processing.

    coroutine asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)¶ Create a subprocess.

    The limit argument sets the buffer limit for StreamReader wrappers for Process.stdout and Process.stderr (if subprocess.PIPE is passed to stdout and stderr arguments).

    Return a Process instance.

    As you can see it returns a Process object - meaning OS process has been created. I believe that means OS gonna run the subprocess not python and since OS has access to all cores it will be using all available cores.

    edited

    https://docs.python.org/3/library/asyncio-subprocess.html

    Directly from the documentation:

    import asyncio
    
    async def run(cmd):
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
    
        stdout, stderr = await proc.communicate()
    
        print(f'[{cmd!r} exited with {proc.returncode}]')
        if stdout:
            print(f'[stdout]\n{stdout.decode()}')
        if stderr:
            print(f'[stderr]\n{stderr.decode()}')
    
    
    async def main():
        await asyncio.gather(
            run('ls /zzz'),
            run('sleep 1; echo "hello"'))
    
    asyncio.run(main())
    

    From Documentation:

    Because all asyncio subprocess functions are asynchronous and asyncio provides many tools to work with such functions, it is easy to execute and monitor multiple subprocesses in parallel.

    For running tasks concurrently all you have to do is use asyncio.gather() function.

    # Without loop
    a = asyncio.create_subprocess_exec(...) 
    b = asyncio.create_subprocess_exec(...)
    await asyncio.gather(a,b)
    
    # Using loop
    tasks = []
    for a,b,c in some_func():
        tasks.append(asyncio.create_subprocess_exec(a,b,c,...))
    
    await asyncio.gather(*tasks)
    

    Unless you want to leverage multiple cores this should just work without even converting async def to def. If you wish to use all cores then check out the below links and play with ProcessPoolExecutor.

    edited

    I am not sure if it is even relevant but I found this method in documentation:

    https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

    https://docs.python.org/3/library/asyncio-task.html#running-in-threads

    Somewhat related question: How to properly use asyncio run_coroutine_threadsafe function?