Search code examples
pythonpython-3.xpython-asynciothreadpoolexecutor

Working of concurrent.futures.ThreadPoolExecutor max workers when scaling up the application


I am new to Python programming. Most of my code is using the asyncio, as I am making the IO calls to the database, though in certain cases I am using the non async methods which are long running like few Pandas framework calls to the database, therefore to avoid the blocking call which restricts scalability, I am using concurrent.futures.ThreadPoolExecutor to execute the blocking method as follows:

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
      values = executor.map(func, data)

The func above is supplied the data collection which is at max of length 2, basically requiring no more than 2 threads, but when multiple users come in and application needs to scale, at that time, what shall be ideal max_workers value:

  1. Shall it be required by each user, which is 2
  2. Shall it be max possible value, which as mentioned on the link - https://docs.python.org/3/library/concurrent.futures.html

Changed in version 3.8: Default value of max_workers is changed to min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.

  1. Shall I not mention it at all and it can be spawned as per requirement

Main point remains, if 10 users start doing the same operation do they end up using the same ThreadPoolExecutor(shared) or do they end up getting the different executor as this is not shared object. I wanted to ensure that when scaled up application doesn't suffer due to incorrect design


Solution

  • If you call ThreadPoolExecutor from the async code you should use asyncio run_in_executor function, otherwise it will block the main event loop.

    If the extra workload is CPU bound then you should also use ProcessPoolExecutor instead.

    Example from Python docs:

    import asyncio
    import concurrent.futures
    
    def cpu_bound():
        # CPU-bound operations will block the event loop:
        # in general it is preferable to run them in a
        # process pool.
        return sum(i * i for i in range(10 ** 7))
    
    async def main():
        loop = asyncio.get_running_loop()
    
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(pool, cpu_bound)
            print('custom process pool', result)
    
    asyncio.run(main())
    

    When it comes to max_workers, the default value is usually fine:

    • ThreadPoolExecutor:

      • os.cpu_count() * 5 from Python 3.5 ;
      • min(32, os.cpu_count() + 4) from Python 3.8 ;
      • min(32, (os.process_cpu_count() or 1) + 4) from Python 3.13 ;
    • ProcessPoolExecutor: os.cpu_count() or 1 (os.process_cpu_count() or 1 from Python 3.13).

    It depends on your workload (CPU vs. I/O bound), the default values are higher "assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work", but for CPU bound tasks there is no point to set this to a number greater than available CPUs as it may actually degrade the performance due to context switching etc.

    Both executors are using queues to enqueue and schedule task on available threads/processes.

    Update: Thu 25 Mar 15:17:51 UTC 2021

    The asyncio event loop is single threaded so you'll see the issue when you schedule other coroutines at the same time. As you can see the none-blocking task was blocked for 10s by the blocking executor:

    $ python test.py
    START none-blocking executor: (scheduled: 5.0s)
    START none-blocking: (scheduled: 1.0s)
    START blocking executor: (scheduled: 10.0s)
    END none-blocking executor: (elapsed: 5.0s)
    END blocking executor: (elapsed: 10.0s)
    END none-blocking: (elapsed: 10.0s)
    

    If you run this a few times and the blocking executor will start first, none-blocking task won't be even started before the blocking executor will end:

    $ python test.py
    START none-blocking executor: (scheduled: 5.0s)
    START blocking executor: (scheduled: 10.0s)
    END none-blocking executor: (elapsed: 5.0s)
    END blocking executor: (elapsed: 10.0s)
    START none-blocking: (scheduled: 1.0s)
    END none-blocking: (elapsed: 1.0s)
    

    When you comment out the blocking executor you can see that all calls are asynchronous now:

    $ python test.py
    START none-blocking executor: (scheduled: 5.0s)
    START none-blocking: (scheduled: 1.0s)
    END none-blocking: (elapsed: 1.0s)
    END none-blocking executor: (elapsed: 5.0s)
    

    The key takeaway is that once you start writing asynchronous code you can't mix it with synchronous calls.

    test.py:

    import asyncio
    import time
    
    from concurrent.futures import ThreadPoolExecutor
    
    
    def blocking(msg, t):
        t1 = time.perf_counter()
    
        print(f"START {msg}: (scheduled: {t}s)")
        time.sleep(t)
        print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
    
    
    async def task1(msg, t):
        t1 = time.perf_counter()
    
        print(f"START {msg}: (scheduled: {t}s)")
        await asyncio.sleep(t)
        print(f"END {msg}: (elapsed: {time.perf_counter() - t1:.1f}s)")
    
    
    async def task2(msg, t):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(blocking, msg, t)
            future.result()
    
    
    async def main():
        loop = asyncio.get_running_loop()
    
        aws = [
            task1("none-blocking", 1.0),
            loop.run_in_executor(None, blocking, "none-blocking executor", 5.0),
            task2("blocking executor", 10.0),
        ]
    
        for coro in asyncio.as_completed(aws):
            await coro
    
    
    if __name__ == "__main__":
        asyncio.run(main())