Search code examples
pythonparallel-processingmultiprocessingdaskdask-delayed

How to improve efficiency on parallel loops in Python


I'm intrigued on how less efficient are parallel loops in Python compared to parloop from Matlab. Here I am presenting a simple root-finding problem brute-forcing initial 10^6 initial guesses between a and b.

import numpy as np
from scipy.optimize import root
import matplotlib.pyplot as plt
import multiprocessing

# define the function to find the roots
func = lambda x: np.sin(3*np.pi*np.cos(np.pi*x)*np.sin(np.pi*x))

def forfunc(x0):
    q = [root(func, xi).x for xi in x0]
    q = np.array(q).T[0]
    return q

# variables os the problem
a = -3
b = 5
n = int(1e6)
x0 = np.linspace(a,b,n) # list of initial guesses

# the single-process loop
q = forfunc(x0)

# parallel loop
nc = 4
pool = multiprocessing.Pool(processes=nc)
q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
pool.close()

The single-process loop takes 1min 26s of wall time and the parallel loop takes 1min 7s. I see some improvement as the speedup is 1.28, but the efficiency (timeloop/timeparallel/n_process) is 0.32 in this case.

What is happening here and how to improve this efficiency? Am I doing something wrong?

I also tried using dask.delayed in two ways:

import dask

# Every call is a delayed object
q = dask.compute(*[dask.delayed(func)(xi) for xi in x0])

# Every chunk is a delayed object
q = dask.compute(*[dask.delayed(forfunc)(x0i) for x0i in np.split(x0,nc)])

And here both takes more time than the single-process loop. The wall time for the first try is 3min and for the second try it took 1min 27s.


Solution

  • What's Happening with Dask (or Spark)

    From your single-process tests, your loop executes one million tasks in 90 seconds. Thus, each task takes your CPU about 90 microseconds in the average case.

    In distributed computing frameworks like Dask or Spark that provide flexibility and resiliency, tasks have a small overhead associated with them. Dask's overhead is as low as 200 microseconds per task. The Spark 3.0 documentation suggests that Spark can support tasks as short as 200 milliseconds, which perhaps means Dask actually has 1000x less overhead than Spark. It sounds like Dask is actually doing really well here!

    If your tasks are faster than the per-task overhead of your framework, you'll simply see worse performance using it relative to manually distributing your work across the same number of machines/cores. In this case, you're running into that scenario.

    In your chunked data Dask example you have only a few tasks, so you see better performance from reduced overhead. But, you are either likely taking a small performance hit from the overhead of Dask relative to raw multiprocessing, or you're not using a Dask cluster and running the tasks a single process.

    Multiprocessing (and Dask) Should Help

    Your results with multiprocessing are generally unexpected for this kind of embarrassingly parallel problem. You may want to confirm the number of physical cores on your machine and in particular make sure nothing else is actively utilizing your CPU cores. Without knowing anything else, I would guess that's the culprit.

    On my laptop with two physical cores, your example takes:

    • 2min 1s for the single process loop
    • 1min 2s for two processes
    • 1min for four processes
    • 1min 5s for a chunked Dask example with nc=2 to split into two chunks and a LocalCluster of two workers and one thread per worker. It may be worth double checking you're running on a cluster.

    Getting a roughly 2x speedup with two processes is line with expectations on my laptop, as is seeing minimal or no benefit from more processes for this CPU bound task. Dask also adds a bit of overhead relative to raw multiprocessing.

    %%time
    ​
    # the single-process loop
    q = forfunc(x0)
    CPU times: user 1min 55s, sys: 1.68 s, total: 1min 57s
    Wall time: 2min 1s
    
    %%time
    ​
    # parallel loop
    nc = 2
    pool = multiprocessing.Pool(processes=nc)
    q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
    pool.close()
    CPU times: user 92.6 ms, sys: 70.8 ms, total: 163 ms
    Wall time: 1min 2s
    
    %%time
    ​
    # parallel loop
    nc = 4
    pool = multiprocessing.Pool(processes=nc)
    q = np.hstack(pool.map(forfunc,np.split(x0,nc)))
    pool.close()
    CPU times: user 118 ms, sys: 94.6 ms, total: 212 ms
    Wall time: 1min
    
    from dask.distributed import Client, LocalCluster, wait
    client = Client(n_workers=2, threads_per_worker=1)
    
    %%time
    ​
    nc = 2
    chunks = np.split(x0,nc)
    client.scatter(chunks, broadcast=True)
    q = client.compute([dask.delayed(forfunc)(x0i) for x0i in chunks])
    wait(q)
    /Users/nickbecker/miniconda3/envs/prophet/lib/python3.7/site-packages/distributed/worker.py:3382: UserWarning: Large object of size 4.00 MB detected in task graph: 
      (array([1.000004, 1.000012, 1.00002 , ..., 4.99998 ... 2, 5.      ]),)
    Consider scattering large objects ahead of time
    with client.scatter to reduce scheduler burden and 
    keep data on workers
    
        future = client.submit(func, big_data)    # bad
    
        big_future = client.scatter(big_data)     # good
        future = client.submit(func, big_future)  # good
      % (format_bytes(len(b)), s)
    CPU times: user 3.67 s, sys: 324 ms, total: 4 s
    Wall time: 1min 5s