Search code examples
pythonparallel-processingmultiprocessingjoblib

Parallelize loop with "non-mapable" function in Python


I solve numerically integral equation on the time interval from 0 to Tmax with predefined step dT. I do it in the for loop:

list_of_values = []

for i in range(dT,Tmax+dT,dT):
   func_at_t= my_fancy_solver(func_at_t)
   list_of_values.append(func_at_t)

The function my_fancy_solver has one argument and inside it looks as follows:

def my_fancy_solver(func_at_prev_t):
    while True:
        # iterate in while loop until convergence will achieved
        # compute approx_error and func_at_new_t_approx
        if approx_error < 10**(-10):
            func_at_new_t = func_at_new_t_approx
            break
    return func_at_new_t 

which means that this solver deals with function value at the previous moment of time func_at_prev_t and approximate a unknown value of function at next moment of time func_at_new_t. It happens in while-loop until desired accuracy will achieved, which is controlled by approx_error variable. It means that in line

func_at_t= my_fancy_solver(func_at_t)

(as I assume) I find function value at a given moment of time and then use it to find the next value.

I would like to understand how can I parallelize this computation. My confusion is that the function my_fancy_solver is not mappable, so I cannot represent it as map(my_fancy_solver, t) where t is a given time value. So, I do not understandt how can I perform parallelization with help of multiprocessing or joblib. Can anyone give some advice?


Solution

  • Given your following answer comment, you may have a reduce-able function:

    I use previosly found function_at_t in next iteration

    Hence, consider functools.reduce which is one of the higher-order functions (map/reduce/filter), or itertools.accumulate to keep intermediate values in a list.

    Because reduce and accumulate expect a method of two arguments, below uses a layered method to accumulate results and below assumes your function, my_fancy_solver, receives time as only required argument.

    from functools import reduce
    from itertools import accumulate
    
    ...
    
    def my_fancy_process(accum_value, time): 
        return accum_value + my_fancy_solver(time)
    
    final_single_value = reduce(
        my_fancy_process, iterable=range(dT, Tmax+dT, dT), initializer=0)
    )
    
    list_of_values = accumulate(
        range(dT, Tmax+dT, dT), func=my_fancy_process, initial=0
    )
    

    Search online for examples of how to parallelize reduce and accumulate calls.