Search code examples
pythonnumpympimpi4py

Using MPI correctly with multithreaded NumPy functions


After reading the mpi4py documentation, I can't find any information about the use of MPI with multithreaded NumPy applications. That is, most of the mpi4py documentation related to NumPy involve data movement operations (e.g., Bcast, Scatter, Gather), but I'm interested in parallelizing heavier computational procedures, for instance linear system solves with np.linalg.solve, which I understand can utilize multiple cores/threads when NumPy is linked against several libraries. What's missing from the documentation is guidance for the number of MPI processes to use when each process may itself utilize multiple threads/cores, as in the example below. Similarly, if there was a way to determine the exact number of threads/cores used by a given NumPy procedure, then presumably some basic arithmetic could be used to determine how many MPI processes to use.

How can I reason about the number of MPI processes that can/should be used given the goal is to parallelize a NumPy program which itself is using multiple threads?

The following basic example on my laptop (with support for 12 threads) has runtime which scales essentially linearly with respect to the number of MPI processes launched, leading me to believe the code is being serialized behind the scenes, most likely since the BLAS/LAPACK routine is already using multiple cores/threads.

import time 

from mpi4py import MPI 
import numpy as np 
import numpy.random as npr 

world = MPI.COMM_WORLD
world_size: int = world.Get_size()
rank: int = world.Get_rank()
master: bool = (rank == 0)

n: int = 8_000 # corresponds to ~2.5s runtime on my laptop 
A: np.ndarray = npr.randint(0, 10, (n, n))
b: np.ndarray = npr.randint(0, 10, (n,))
start: float = time.perf_counter() 
_ = np.linalg.solve(A, b) 
runtime: float = time.perf_counter() - start 
if master: 
    print(f"{runtime=:.3f}s")

I would expect that launching this with mpiexec -n 2 would be the same runtime as mpiexec -n 1, but this is not the case, it takes approximately twice as long. Note that if I do not use np.linalg.solve but a function like time.sleep, using twice as many processes does not increase the runtime.

EDIT: To clarify what seems to be a misunderstanding here, this example is meant to stand-in for an application in which we are hoping to complete a distinct, independent linear system solve (one per process). I.e., the data (A, b) would be distinct for each process.


Solution

  • How can I reason about the number of MPI processes that can/should be used given the goal is to parallelize a NumPy program which itself is using multiple threads?

    In general, you want the NumPy parallelism level times the MPI parallelism level to equal the number of logical cores.

    A reasonable guess for this, if you have two nested levels of parallelism, is that the number of MPI processes should be the square root of the number of logical cores rounded down to the nearest integer, and that the NumPy parallelism should be the number of logical cores divided by the number of MPI processes rounded down to the nearest integer.

    For your case, that would result in 3 MPI processes, each with a NumPy threadpool containing 4 threads.

    However, I would emphasize that this is just a starting point, and you should determine what works best for your application experimentally. At minimum, you should also try no MPI parallelism, and full NumPy parallelism, and full MPI parallelism and no NumPy parallelism.

    Similarly, if there was a way to determine the exact number of threads/cores used by a given NumPy procedure, then presumably some basic arithmetic could be used to determine how many MPI processes to use.

    In general, if you do not configure it, NumPy will use all cores for algorithms which can be parallelized, and one core for algorithms which cannot be parallelized.

    The algorithms which can use multiple cores are not usually documented, and may change in different versions of your BLAS library. For that reason, I suggest checking experimentally.

    If you want to check experimentally if a particularly NumPy algorithm uses multiple cores, you can measure real time using time.perf_counter_ns(), and measure CPU clock time using time.process_time_ns(). The change in CPU clock time divided by the change in real time gives you the average concurrency for a process.

    Example:

    import numpy as np
    import time
    
    
    def measure_func(func, repeat=10):
        time.sleep(1)
        real_start = time.perf_counter_ns()
        cpu_start = time.process_time_ns()
        for i in range(repeat):
            func()
        real_end = time.perf_counter_ns()
        cpu_end = time.process_time_ns()
        real_delta = real_end - real_start
        cpu_delta = cpu_end - cpu_start
        print(f"Time taken: {real_delta / 1e9:.3f}s")
        print(f"CPU time taken: {cpu_delta / 1e9:.3f}s")
        print(f"Average concurrency: {cpu_delta / real_delta:.3f}")
        
    
    A = np.random.rand(1000, 1000)
    b = np.random.rand(1000, 1000)
    
    print("Matrix solve")
    measure_func(lambda: np.linalg.solve(A, b))
    print("Adding matrix")
    measure_func(lambda: A + b, repeat=1000)
    

    NumPy's choice to use either all cores or a single core is usually not optimal. I would recommend tuning it. You can do this either via the environment variable OMP_NUM_THREADS, or the library threadpoolctl. I recommend the threadpoolctl library. It allows you to change the thread pool size while your program is running, and is more convenient. Try changing the above program so that np.linalg.solve() uses only one core.