Search code examples
pythonhpcscientific-computingmpi4py

Parallelization within a python object


I am working on a simulation where I need to compute an expensive numerical integral at many different time points. Each integrand is a function of the time it is sampling up to, so I must evaluate each of the points independently. Because each integral is independent of all others, this can be implemented in an embarrassingly parallel fashion.

I would like to run this on an HPC cluster, so I have attempted to parallelize this process using mpi4py; however, my current implementation causes each processor to do the entire calculation (including the scattering to other cores) rather than have only the for loop inside of the object parallelized. As written, with n cores this takes n times as long as with one core (not a good sign...).

Because the only step which takes any amount of time is the computation itself, I would like everything except that specific for loop to run on the root node.

Below is a pseudo-code reduction of my current implementation:

import numpy as np
from mpi4py import MPI

COMM = MPI.COMM_WORLD

class Integrand:

    def __init__(self, t_max, dt, **kwargs):
        self.t_max = t_max
        self.dt = dt
        self.time_sample = np.arange(0, self.t_max, self.dt)
        self.function_args = kwargs
        self.final_result = np.empty_like(self.time_sample)

    def do_integration(self):
        if COMM.rank == 0:
            times_partitioned = split(self.time_sample, COMM.size)
        else:
            times_partitioned = None

        times_partitioned = COMM.scatter(times_partitioned, root=0)

        results = np.empty(times_partitioned.shape, dtype=complex)

        for counter, t in enumerate(times_partitioned):
            results = computation(self, t, **self.function_args)

        results = MPI.COMM_WORLD.gather(results, root=0)

        if COMM.rank is 0:
            ##inter-leaf back together
            for i in range(COMM.size):
                self.final_result[i::COMM.size] = results[i]

if __name__  = '__main__':

    kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]

    for kwargs in kwargs_set:
        integrand_object = Integrand(**kwargs)
        integrand_object.do_integration()
        save_and_plot_results(integrand_object.final_result)


Solution

  • A simple way to parallelize this problem without drastically changing how the class is called/used is to make use of a decorator. The decorator (shown below) makes it so that rather than creating the same object on every core, each core creates an object with the chunk of the time steps it needs to evaluate. After they have all been evaluated it gathers their results and returns a single object with the full result to one core. This particular implementation changes the class functionality slightly by forcing evaluation of the integral at creation time.

    from functools import wraps
    import numpy as np
    from mpi4py import MPI
    
    COMM = MPI.COMM_WORLD
    
    def parallelize_integrand(integral_class):
        def split(container, count):
            return [container[_i::count] for _i in range(count)]
    
        @wraps(integral_class)
        def wrapper(*args,**kwargs):
             int_object = integral_class(*args, **kwargs)
             time_sample_total = int_object.time_sample
             if COMM.rank is 0:
                 split_time = split(time_sample_total,COMM.size)
                 final_result = np.empty_like(int_object.result)
             else:
                 split_time = None
             split_time = COMM.scatter(split_time, root=0)
             int_object.time_sample = split_time
             int_object.do_integration()
             result = int_object.result
             result = COMM.gather(result, root=0)
    
             if COMM.rank is 0:
                 for i in range(COMM.size):
                     final_result[i::COMM.size] = result[i]
                 int_object.time_sample = time_sample_total
                 int_object.result = final_result
                 return int_object
    
    
    @parallelize_integrand
    class Integrand:
    
        def __init__(self, t_max, dt, **kwargs):
            self.t_max = t_max
            self.dt = dt
            self.time_sample = np.arange(0, self.t_max, self.dt)
            self.kwargs = kwargs
            self.result = np.empty_like(self.time_sample)
    
        def do_integration(self):
            for counter, t in enumerate(self.time_sample):
                result[counter] = computation(self, t, **self.kwargs)
    
    
    if __name__  = '__main__':
        kwargs_set = [kwargs1, kwargs2, kwargs3, ..., kwargsN]
        for kwargs in kwargs_set:
            integrand_object = Integrand(**kwargs)
            save_and_plot_results(integrand_object.result)