Search code examples
pythonmultithreadingasynchronousasync-awaitpython-asyncio

Using loop.run_in_executor to call sync functions from async ones


I have 3 functions: func_1, func_2, and func_3. I would like to run these asynchronously, so that I do not have to wait for func_1 to finish before func_2 starts executing.

The problem is, that the definition of func_1 for example looks something like this:

async def func_1(a, b):
    x = some_sync_func(a)
    y = some_other_sync_func(b)
    z = another_sync_func(x, y)
    return yet_another_sync_func(z)

The functions that I am calling within func_1 are all synchronous functions which are non-awaitable. Thus, they will block the execution of func_2 and func_3.

I read here that loop.run_in_executor() can be used to call synchronous functions from asynchronous functions without blocking the execution. Thus, I modified the definition of func_1 as follows:

async def func_1(a, b):
    loop = asyncio.get_event_loop()
    x = await loop.run_in_executor(None, some_sync_func, a)
    y = await loop.run_in_executor(None, some_other_sync_func, b)
    z = await loop.run_in_executor(None, lambda: another_sync_func(a,b))
    w = await loop.run_in_executor(None, yet_another_sync_func, z)
    return w

Is this the right way to deal with this problem? Am I using loop.run_in_executor() correctly? Here, the docs provide an example which seems to support this. I don't know what threads are, or what a "process pool" is, and haven't really been able to make much sense of the docs.


Solution

  • Almost right, but since you are awaiting eagerly at each function call, the next line of code in each case (after the await) will only be called when the line with await finishes execution.

    However if you call func_1 in parallel from some other place, two instances of func_1 will work in parallel. (I am almost sure that is not what you want).

    So, in order for these other functions to actually run in parallel (in other threads), you have to create the task to run each of them, but not await it immediately, instead, you gather all the tasks you want to run in parallel and await for them at once (usually with a function properly named gather ):

    ...
    
    async def func_1(a, b):
        loop = asyncio.get_event_loop()
        task_x = loop.run_in_executor(None, some_sync_func, a)
        task_y = loop.run_in_executor(None, some_other_sync_func, b)
        task_z = loop.run_in_executor(None, lambda: another_sync_func(a,b))
        x, y, z = await asyncio.gather(task_x, task_y, task_z)
        # this depends on `z` so, it is not included in the gather. 
        # if its return value is not important, you can ommit the 
        # await, return the task, and await for it sometime later.
        w = await loop.run_in_executor(None, yet_another_sync_func, z)
        return w
    ...