Hi, I've got a computationally heavy function which I am running in parallel.
I've noticed that when using the concurrent futures ProcessPoolExecutor
(or the multiprocessing Pool
) the processes slows down when using more workers (or adding more tasks). I.e. when I run my code on 2 workers, the average execution time of a process is around 0.7s, however when using 16 workers (the cpu_count
of my processor) the average execution time of the same process is of 6.7s.
A similar thing happens when running the same calculation on more tasks. See the test code and results below.
import concurrent.futures
import os
from time import perf_counter
import numpy as np
def func(foo):
start_time = perf_counter()
long_calculation = np.random.random(size=100000000).std()
stop_time = perf_counter()
execution_time = stop_time - start_time
return execution_time
cpu_count = os.cpu_count()
assert cpu_count is not None
print(f"CPU Count: {cpu_count}")
# ======= Increasing Tasks ============
# max_workers = 10
# for tasks in [1, 2, 5, 10, 20, 50, 100, 1000]:
# ======= Increasing Max Workers ============
tasks = 50
for max_workers in range(1, cpu_count + 1):
with concurrent.futures.ProcessPoolExecutor(max_workers) as pool:
total_exec_time: float = 0.0
processes = pool.map(func, range(tasks))
for process_result in processes:
total_exec_time += process_result
print(
f"{tasks} tasks on {max_workers} workers - average process execution time: {round(total_exec_time / tasks, 2)}s"
)
max_workers
PU Count: 16
50 tasks on 1 workers - average process execution time: 0.65s
50 tasks on 2 workers - average process execution time: 0.76s
50 tasks on 3 workers - average process execution time: 0.97s
50 tasks on 4 workers - average process execution time: 1.16s
50 tasks on 5 workers - average process execution time: 1.6s
50 tasks on 6 workers - average process execution time: 1.9s
50 tasks on 7 workers - average process execution time: 2.29s
50 tasks on 8 workers - average process execution time: 2.67s
50 tasks on 9 workers - average process execution time: 3.02s
50 tasks on 10 workers - average process execution time: 3.5s
50 tasks on 11 workers - average process execution time: 4.0s
50 tasks on 12 workers - average process execution time: 4.73s
50 tasks on 13 workers - average process execution time: 5.37s
50 tasks on 14 workers - average process execution time: 5.66s
50 tasks on 15 workers - average process execution time: 6.07s
50 tasks on 16 workers - average process execution time: 6.71s
10 workers with 1 tasks - average process execution time: 0.67s
10 workers with 2 tasks - average process execution time: 0.84s
10 workers with 5 tasks - average process execution time: 1.91s
10 workers with 10 tasks - average process execution time: 3.82s
10 workers with 20 tasks - average process execution time: 3.52s
10 workers with 50 tasks - average process execution time: 3.88s
10 workers with 100 tasks - average process execution time: 4.0s
10 workers with 1000 tasks - average process execution time: 3.92s
Overall, the whole program runs faster, however it is slowing down progressively. I would like it to keep the same efficiency, no matter how many workers I use.
NOTE
The CPU usage is increasing linearly alongside the
max_workers
parameter. The CPU usage is at 100% when using 15 or 16max_workers
(the idle CPU usage is around 7%) and my RAM has always plenty of free space.
Side note: I get the exact same results using the multiprocessing Pool
.
from multiprocessing import Pool
...
with Pool(processes=max_workers) as pool:
...
By modifying the code to do purely CPU-intensive work rather than a combination of CPU/RAM stress, we can show (with little doubt) that the issue is due to memory consumption.
from time import perf_counter
from concurrent.futures import ProcessPoolExecutor
from random import random
SIZE = 5_000_000
def func(_):
start_time = perf_counter()
for _ in range(SIZE):
random()
return perf_counter() - start_time
def main():
tasks = 50
for max_workers in range(2, 10):
with ProcessPoolExecutor(max_workers) as pool:
total_exec_time = sum(pool.map(func, range(tasks)))
print(f"{tasks} tasks on {max_workers} workers - average process execution time: {round(total_exec_time / tasks, 2)}s")
if __name__ == '__main__':
main()
Output:
50 tasks on 2 workers - average process execution time: 0.22s
50 tasks on 3 workers - average process execution time: 0.22s
50 tasks on 4 workers - average process execution time: 0.23s
50 tasks on 5 workers - average process execution time: 0.23s
50 tasks on 6 workers - average process execution time: 0.23s
50 tasks on 7 workers - average process execution time: 0.24s
50 tasks on 8 workers - average process execution time: 0.24s
50 tasks on 9 workers - average process execution time: 0.24s
Notes:
Python 3.11.2
macOS 13.2.1 (Ventura)
CPU 3GHz 10-Core Intel Xeon W
RAM 32GB