Search code examples
python-3.xoptimizationparallel-processingmultiprocessingjoblib

How to speed up optimization using parallel


I am doing optimization using optimize.minimize from scipy, say the objective function is fun.

I need to do optimization for each row of my dataframe, and currently I am using Parallel from joblib:

import pandas as pd
import time
from joblib import Parallel,delayed
import multiprocessing
from scipy.optimize import basinhopping
import numpy as np
import jax

df=pd.DataFrame()
rng = np.random.default_rng()
df[["p11", "p12", "p13"]]=rng.dirichlet(np.ones(3),size=1120)
df[["s1", "s2", "s3"]]=rng.dirichlet(np.ones(3),size=1120)


num_cores = multiprocessing.cpu_count()-1

def get_attraction(b1,b2, b3):
    c1 = 20 * b1 + 12 * b2 + 6 * b3
    c2 = 12 * b1 + 24 * b2 + 18 * b3
    c3 = 0 * b1 + 14 * b2 + 30 * b3
    return c1,c2,c3

def get_a_tau_max(p1_tau, p2_tau, p3_tau, a):
    b1_tau_l1_a1_a1 = p1_tau * (1 - a) + a
    b2_tau_l1_a1_a1 = p2_tau * (1 - a)
    b3_tau_l1_a1_a1 = p3_tau * (1 - a)

    a1_tau_l1_a1, a2_tau_l1_a1, a3_tau_l1_a1 = get_attraction(b1_tau_l1_a1_a1, b2_tau_l1_a1_a1,b3_tau_l1_a1_a1)

    b1_tau_l1_a1_a2 = p1_tau * (1 - a)
    b2_tau_l1_a1_a2 = p2_tau * (1 - a) + a
    b3_tau_l1_a1_a2 = p3_tau * (1 - a)
    a1_tau_l1_a2, a2_tau_l1_a2, a3_tau_l1_a2 = get_attraction(b1_tau_l1_a1_a2, b2_tau_l1_a1_a2,b3_tau_l1_a1_a2)

    b1_tau_l1_a1_a3 = p1_tau * (1 - a)
    b2_tau_l1_a1_a3 = p2_tau * (1 - a)
    b3_tau_l1_a1_a3 = p3_tau * (1 - a) + a
    a1_tau_l1_a3, a2_tau_l1_a3, a3_tau_l1_a3 = get_attraction(b1_tau_l1_a1_a3, b2_tau_l1_a1_a3,b3_tau_l1_a1_a3)

    a_tau_max = max(a1_tau_l1_a1, a2_tau_l1_a2, a3_tau_l1_a3)

    return a_tau_max

def get_signal_conditional_at(at, b1, b2, b3, a):

    if at==1:
        b1_tau = b1 * (1-a) + a
        b2_tau = b2 * (1-a)
        b3_tau = b3 * (1-a)
    elif at==2:
        b1_tau = b1 * (1-a)
        b2_tau = b2 * (1-a) + a
        b3_tau = b3 * (1-a)
    else:
        b1_tau = b1 * (1-a)
        b2_tau = b2 * (1-a)
        b3_tau = b3 * (1-a) + a
    return b1_tau, b2_tau, b3_tau

def get_belief(index,row):
    if index <= 1118:
        df_mask = pd.DataFrame()
        df_mask["weight"] = [0.8 ** i for i in range(0, index+1)][::-1]
        dem = df_mask["weight"].sum()

        subdf = df.iloc[:index].copy()
        s1_history = subdf["s1"].values.tolist()
        s2_history = subdf["s2"].values.tolist()
        s3_history = subdf["s3"].values.tolist()

        belief=[]
        for at in [[row["b11"], row["b12"], row["b13"]],
                    [row["b21"], row["b22"], row["b23"]],
                    [row["b31"], row["b32"], row["b32"]]]:

            df_mask["s1"] = s1_history + [at[0]]
            df_mask["s2"] = s2_history + [at[1]]
            df_mask["s3"] = s3_history + [at[2]]

            nom1 = (df_mask["weight"] * df_mask["s1"]).sum()
            nom2 = (df_mask["weight"] * df_mask["s2"]).sum()
            nom3 = (df_mask["weight"] * df_mask["s3"]).sum()

            b1 = nom1 / dem
            b2 = nom2 / dem
            b3 = nom3 / dem
            belief += [b1,b2,b3]
        return belief
    else:
        return [0,0,0,0,0,0,0,0,0]


def get_prob(x, index,row, lamda, a): #
    p1, p2, p3 = x[0], x[1], x[2]

    p11 = row["p11"]
    p12 = row["p12"]
    p13 = row["p13"]

    b1 = p11 * (1-a) + p1 * a
    b2 = p12 * (1-a) + p2 * a
    b3 = p13 * (1-a) + p3 * a

    c1,c2,c3=get_attraction(b1,b2,b3)

    row["b11"], row["b12"], row["b13"] = get_signal_conditional_at(1, p11,p12,p13,a)
    row["b21"], row["b22"], row["b23"] = get_signal_conditional_at(2, p11,p12,p13,a)
    row["b31"], row["b32"], row["b33"] = get_signal_conditional_at(3, p11,p12,p13,a)

    belief = get_belief(index,row)

    t1 = get_a_tau_max(belief[0], belief[1], belief[2], a)
    t2 = get_a_tau_max(belief[3], belief[4], belief[5], a)
    t3 = get_a_tau_max(belief[6], belief[7], belief[8], a)

    a1 = c1+t1
    a2 = c2+t2
    a3 = c3+t3

    nom1 = np.exp(lamda*a1)
    nom2 = np.exp(lamda*a2)
    nom3 = np.exp(lamda*a3)
    dem = nom1 + nom2 + nom3

    p1_t = nom1 / dem
    p2_t = nom2 / dem
    p3_t = nom3 / dem

    return (p1_t - p1) ** 2 + (p2_t - p2) ** 2 + (p3_t - p3) ** 2

def get_root(index,row,lamda,a):
        fun = get_prob
        #jac_ = jax.jacfwd(fun)
        result = basinhopping(fun, x0=[0.0, 0.25, 0.75], niter=50, interval=10, seed=np.random.seed(0),
                              minimizer_kwargs={'args': (index,row,lamda,a), #'jac':jac_,
                                                'method': "SLSQP",
                                                'tol': 1.0e-4,
                                                'bounds': [(0.0, 1.0), (0.0, 1.0), (0.0, 1.0)],
                                                'constraints': {"type": 'eq',
                                                                'fun': lambda x: 1 - x[0] - x[1] - x[2],
                                                                'jac': lambda x: np.full_like(x,-1)}})
        x=np.append(result.x, index)
        return x

start = time.time()
p_sv = Parallel(n_jobs=num_cores)(delayed(get_root)(index=index, row=row, lamda=0.5,a=7/13)
                                  for index, row in df.iterrows())
print("elapsed:", time.time() - start)
elapsed: 240.9707179069519

Each optimization is independent, there is no information exchange between the rows.

It takes very long to finish the optimization for the whole dataset, and it only occupies about 30% of CPU (I have M1 pro).

My questions is how many n_jobs I should set (or there is some other way such as change backend) to make it use 100% CPU such that I can finish this program faster?

I have access to a computer cluster which has 2 CPUs and 64 core per CPU. I have tried to set n_job=64 it does not provide significant improvement (I am still learning how to utilize 2 CPUs..).

Update: Now I figured that providing Jacobian of the objective function slows down optimization. With 80 rows, with jac_ it takes 64 seconds, without it it take 20 seconds (and CPU goes almost 100%). But why? I thought providing Jacobian would make it faster.

Update: I add an fun and df with 1120 rows. If I do not use jac_, it is about 5 times faster using the cluster.


Solution

  • This is not a multi-processing issue but an algorithmic complexity one. The function get_prob you are trying to optimize depends on get_belief which is of worst-case linear complexity with respect to the index number. The consequence is that your program is of quadratic complexity, which can be a huge slowdown.

    In itself, the function get_belief if very slow. The problem is that get_prob (and thus get_belief) will be called many times by the optimizer during the optimization process, thus slowing down the whole execution. If you remove it or replace it by a constant value (for instance replace index <= 118 by False to short-circuit the expensive code path) you'll see that your program is significantly faster. On my computer this resulted in a 20x speed.

    However, one can note that get_belief does not depend on the variable x of the optimized function fun but only on the row parameters. This likely means that you could pre-compute get_belief for each row of your data-frame instead of computing it inside the optimized function, thus improving the execution speed by a large margin.