Search code examples
python-asynciopython-multithreadingthreadpoolexecutor

How to run `loop_in_executor` in different threads for asyncio?


So, let us say, we have a sync method as shown below:

def sync_method(param1, param2):
    # Complex method logic
    return "Completed"

I want to run the above method in a different async method under run_in_executor in the current eventloop. An example is as following:

async def run_sync_in_executor(param1, param2, pool=None):
    loop = asyncio.get_event_loop()
    value = loop.run_in_executor(pool, sync_method, param1, param2)
    # Some further changes to the variable `value`
    return value

Now, I want to run the above method while looping through a list of params, and eventually modify the final output. One method, which I thought would work but doesn't is using asyncio.gather:

def main():
    params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
    output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])

As I read the docs, and understood, the reason this didn't work is that the method run_sync_in_executor is trying to access the current event loop, which is being shared by all the different executions of gather. Since, there can be only a single thread per event loop, and even before, the first loop has ended, due to the nature of gather the following method is trying to access the event loop, which causes an error.

As a solution, I thought of using ThreadPoolExecutor, which probably creates the number of threads as per the num_workers clause where the pool can be used by each method when executed. I was expecting something of this sort:

with ThreadPoolExecutor(num_workers=8) as executor:
    for param in params_list:
        future = executor.submit(run_sync_in_executor, param[0], param[1], executor)
        print(future.result())

But the above method doesn't work. It would be great if someone could suggest me as to what is the best method of achieving the desired goal?


Solution

  • You have several mistakes in your code: you did not awaited run_in_executor, main should be async function. Working solution:

    import asyncio
    import time
    
    
    def sync_method(param1, param2):
        """Some sync function"""
        time.sleep(5)
        return param1 + param2 + 10000
    
    
    async def ticker():
        """Just to show that sync method does not block async loop"""
        while True:
            await asyncio.sleep(1)
            print("Working...")
    
    
    async def run_sync_in_executor(param1, param2, pool=None):
        """Wrapper around run in executor"""
        loop = asyncio.get_event_loop()
        # run_in_executor should be awaited, otherwise run_in_executor
        # just returns coroutine (not its result!)
        value = await loop.run_in_executor(pool, sync_method, param1, param2)
        return value
    
    
    async def amain():
        """Main should be async function !"""
        params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
        asyncio.create_task(ticker()) # runs in parallel, never awaited!
        output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
        print(output)
    
    if __name__ == '__main__':
        asyncio.run(amain())