Search code examples
pythonlambdaparallel-processingmultiprocessingjoblib

Appending to a list without overwriting previous runs when using joblib to parallelize functions


I am using joblib to parallelize a function that appends results to a list. The problem I am facing is that each run of the function overwrites the previous results in the list. Here's a simplified version of my code:

from multiprocessing import Manager
from scipy.integrate import quad
from scipy import stats
from joblib import Parallel, delayed


# Define the two functions needed

# Create a list of lambda functions
def create_moment_funcs_list(moment_orders):

    moment_funcs_list = []
    for moment_order in moment_orders:

        moment_func    = lambda x: np.abs(x**moment_order)
        moment_funcs_list.append(moment_func)
        
    return moment_funcs_list


# Integrate lambda functions
def parallel_integration_loop(funcptr,  low_lim, up_lim, temp):

    res,_ = quad(funcptr, low_lim, up_lim)
    temp.append(res)
   
    return 
    
   
# Inside the main loop where the code runs

# Create a list of lambda functions
moment_funcs_list = create_moment_funcs_list(moments, distribution, loc, scale, arg)


manager = Manager()
temp = manager.list()



# Main function where Integrare thew lambda functions using scipy's quad
_ = Parallel(n_jobs=n_jobs)(
    delayed(parallel_integration_loop)( f,  -int_limits, int_limits, temp) for  f in moment_funcs_list
)
estimated_moments = temp

However, when I run this code, the list results only contains the results from the last run of my_function, instead of all the results. How can I modify this code so that each run of my_function appends its results to results without overwriting previous runs?

I have tried using a Lock object to prevent concurrent writes to results, but this did not solve the problem. I suspect that the issue has to do with how joblib handles the parallelization of functions, but I am not sure how to address it.

Any suggestions or insights would be greatly appreciated. Thank you!


Solution

  • Why are you even using manager.list(). Just have

    def parallel_integration_loop(funcptr,  low_lim, up_lim):
        res,_ = quad(funcptr, low_lim, up_lim)
        return res
    

    and then

    result = _ = Parallel(n_jobs=n_jobs)(
        delayed(parallel_integration_loop)( f,  -int_limits, int_limits) for  f in moment_funcs_list
    )
    

    Update. . . .

    The problem is in your lambda statement.

    The easiest way to understand this is to look at this code:

    >>> functions = [lambda : i for i in range(10)]
    >>> functions[0]()
    9
    >>> functions[9]()
    9
    

    Similarly, you write:

        for moment_order in moment_orders:
            moment_func    = lambda x: np.abs(x**moment_order)
            moment_funcs_list.append(moment_func)
    

    The problem is that there is only a single variable named moment_order (or in my code i), and every lambda function is using the exact same version of that variable. At the end of your defining all of the functions, moment_order happens to have a particular value, and that's the one that all the lambda functions all going to use.

    You need to make sure that each lambda function has its own copy of moment_order.

    There's the quick, hacky soution:

       moment_func= lambda x, moment_order=moment_order: np.abs(x**moment_order)
    

    which causes the lambda to have a second, optional variable that is assigned the current value of moment_order at the time it was created.

    Easier to read would be:

      def get_moment_function(moment_order):
         return lambda x: np.abs(....)
      return [get_moment_function(moment_order) for moment_order in moment_orders]
    

    This explicitly creates a new variable moment_order within the inner function, that the lambda can then close over.

    This is a common mistake with lambda functions. They use the actual variables from the outer scope, not the value of those variables at the time they were created.