I am using joblib to parallelize a function that appends results to a list. The problem I am facing is that each run of the function overwrites the previous results in the list. Here's a simplified version of my code:
from multiprocessing import Manager
from scipy.integrate import quad
from scipy import stats
from joblib import Parallel, delayed
# Define the two functions needed
# Create a list of lambda functions
def create_moment_funcs_list(moment_orders):
moment_funcs_list = []
for moment_order in moment_orders:
moment_func = lambda x: np.abs(x**moment_order)
moment_funcs_list.append(moment_func)
return moment_funcs_list
# Integrate lambda functions
def parallel_integration_loop(funcptr, low_lim, up_lim, temp):
res,_ = quad(funcptr, low_lim, up_lim)
temp.append(res)
return
# Inside the main loop where the code runs
# Create a list of lambda functions
moment_funcs_list = create_moment_funcs_list(moments, distribution, loc, scale, arg)
manager = Manager()
temp = manager.list()
# Main function where Integrare thew lambda functions using scipy's quad
_ = Parallel(n_jobs=n_jobs)(
delayed(parallel_integration_loop)( f, -int_limits, int_limits, temp) for f in moment_funcs_list
)
estimated_moments = temp
However, when I run this code, the list results only contains the results from the last run of my_function, instead of all the results. How can I modify this code so that each run of my_function appends its results to results without overwriting previous runs?
I have tried using a Lock object to prevent concurrent writes to results, but this did not solve the problem. I suspect that the issue has to do with how joblib handles the parallelization of functions, but I am not sure how to address it.
Any suggestions or insights would be greatly appreciated. Thank you!
Why are you even using manager.list()
. Just have
def parallel_integration_loop(funcptr, low_lim, up_lim):
res,_ = quad(funcptr, low_lim, up_lim)
return res
and then
result = _ = Parallel(n_jobs=n_jobs)(
delayed(parallel_integration_loop)( f, -int_limits, int_limits) for f in moment_funcs_list
)
Update. . . .
The problem is in your lambda statement.
The easiest way to understand this is to look at this code:
>>> functions = [lambda : i for i in range(10)]
>>> functions[0]()
9
>>> functions[9]()
9
Similarly, you write:
for moment_order in moment_orders:
moment_func = lambda x: np.abs(x**moment_order)
moment_funcs_list.append(moment_func)
The problem is that there is only a single variable named moment_order
(or in my code i
), and every lambda function is using the exact same version of that variable. At the end of your defining all of the functions, moment_order
happens to have a particular value, and that's the one that all the lambda functions all going to use.
You need to make sure that each lambda function has its own copy of moment_order
.
There's the quick, hacky soution:
moment_func= lambda x, moment_order=moment_order: np.abs(x**moment_order)
which causes the lambda to have a second, optional variable that is assigned the current value of moment_order
at the time it was created.
Easier to read would be:
def get_moment_function(moment_order):
return lambda x: np.abs(....)
return [get_moment_function(moment_order) for moment_order in moment_orders]
This explicitly creates a new variable moment_order
within the inner function, that the lambda can then close over.
This is a common mistake with lambda functions. They use the actual variables from the outer scope, not the value of those variables at the time they were created.