Search code examples
pythonmultiprocessingpython-multiprocessingmontecarlo

How to implement multiprocessing in Monte Carlo integration


I created a Python program that integrates a given function over a given interval using Monte Carlo simulation. It works well, except for the fact that it runs painfully slow when you want higher levels of accuracy (larger N value). I figured I'd give multiprocessing a try in order to speed it up, but then I realized I have no clue how to implement it. Here's what I have right now:

from scipy import random
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Process
import os

# GOAL: Approximate the integral of a function f(x) from lower bound a to upper bound b using Monte Carlo simulation

# bounds of integration
a = 0
b = np.pi

# function to integrate
def f(x):
    return np.sin(x)

N = 10000
areas = []


def mcIntegrate():
    
    for i in range(N):
        
        # array filled with random numbers between limits
        xrand = random.uniform(a, b, N)
        
        # sum the return values of the function of each random number
        integral = 0.0
        for i in range(N):
            integral += f(xrand[i])
        
        # scale integral by difference of bounds divided by amount of random values
        ans = integral * ((b - a) / float(N))
        
        # add approximation to list of other approximations
        areas.append(ans)


if __name__ == "__main__":

    processes = []
    numProcesses = os.cpu_count()

    for i in range(numProcesses):
        process = Process(target=mcIntegrate)
        processes.append(process)

    for process in processes:
        process.start()

    for process in processes:
        process.start()

    # graph approximation distribution
    plt.title("Distribution of Approximated Integrals")
    plt.hist(areas, bins=30, ec='black')
    plt.xlabel("Areas")
    plt.show()

Can I get some help with this implementation?


Took advice from the comments and used multiprocessor.Pool, and also cut down on some operations by using NumPy instead. Went from taking about 5min to run to now about 6sec (for N = 10000). Here's my implementation:

import scipy
import numpy as np
import matplotlib.pyplot as plt
import multiprocessing
import os

# GOAL: Approximate the integral of function f from lower bound a to upper bound b using Monte Carlo simulation

a = 0       # lower bound of integration
b = np.pi   # upper bound of integration
f = np.sin  # function to integrate
N = 10000   # sample size

def mcIntegrate(p):
    xrand = scipy.random.uniform(a, b, N)       # create array filled with random numbers within bounds
    integral = np.sum(f(xrand))                 # sum return values of function of each random number
    approx = integral * ((b - a) / float(N))    # scale integral by difference of bounds divided by sample size
    return approx


if __name__ == "__main__":
    
    # run simulation N times in parallel and store results in array
    with multiprocessing.Pool(os.cpu_count()) as pool:
        areas = pool.map(mcIntegrate, range(N))

    # graph approximation distribution
    plt.title("Distribution of Approximated Integrals")
    plt.hist(areas, bins=30, ec='black')
    plt.xlabel("Areas")
    plt.show()


Solution

  • This turned out to be a more interesting problem than I thought it would when I got to optimising it. The basic method is very simple:

    from multiprocessing import pool
    
    def f(x):
        return x
    
    results = pool.map(f, range(100))
    

    Here is your mcIntegerate adapted for multiprocessing:

    from tqdm import tqdm
    
    def mcIntegrate(steps):
        tasks = []
    
        print("Setting up simulations")
    
        # linear
        for _ in tqdm(range(steps)):
            xrand = random.uniform(a, b, steps)
            for i in range(steps):
                tasks.append(xrand[i])
    
        pool = Pool(cpu_count())
    
        print("Simulating (no progress)")
        results = pool.map(f, tasks)
        pool.close()
    
        print("summing")
        areas = []
        for chunk in tqdm(range(steps)):
            vals = results[chunk * steps : (chunk + 1) * steps]
            integral = sum(vals)
            ans = integral * ((b - a) / float(steps))
            areas.append(ans)
    
        return areas
    

    tqdm is just used to display a progress bar.

    This is the basic workflow for multiprocessing: break the question up into tasks, solve all the tasks, then add them all back together again. And indeed the code as given works. (Note that I've changed your N for steps).

    For completeness, the script now begins:

    from scipy import random
    import numpy as np
    import matplotlib.pyplot as plt
    from multiprocessing import Pool, cpu_count
    from tqdm import tqdm
    
    # function to integrate
    def f(x):
        return np.sin(x)
    

    and ends

    areas = mcIntegrate(3_000)
    
    a = 0
    b = np.pi
    
    plt.title("Distribution of Approximated Integrals")
    plt.hist(areas, bins=30, ec="black")
    plt.xlabel("Areas")
    plt.show()
    

    Optimisation

    I deliberately split the problem up at the smallest possible level. Was this a good idea? To answer that, consider: how might we optimise the linear process of generating the tasks? This does take a considerable while at the moment. We could parallelise it:

    def _prepare(steps):
        xrand = random.uniform(a, b, steps)
        return [xrand[i] for i in range(steps)]
    
    def mcIntegrate(steps):
        ...
        tasks = []
        for res in tqdm(pool.imap(_prepare, (steps for _ in range(steps))), total=steps):
            tasks += res  # slower except for very large steps
    

    Here I've used pool.imap, which returns an iterator which we can iterate as soon as the results are available, allowing us to build a progress bar. If you do this and compare, you will see that it runs slower than the linear solution. Removing the progress bar (on my machine) and replace with:

        import time
    
        start = time.perf_counter()
        results = pool.map(_prepare, (steps for _ in range(steps)))
        tasks = []
        for res in results:
            tasks += res
        print(time.perf_counter() - start)
    

    Is only marginally faster: it's still slower than running linear. Serialising data to a process and then deserialising it has an overhead. If you try to get a progress bar on the whole thing, it becomes excruciatingly slow:

        results = []
        for result in tqdm(pool.imap(f, tasks), total=len(tasks)):
            results.append(result)
    

    So what about iterating at a higher level? Here's another adaption of your mcIterate:

    a = 0
    b = np.pi
    
    def _mcIntegrate(steps):
        xrand = random.uniform(a, b, steps)
        integral = 0.0
        for i in range(steps):
            integral += f(xrand[i])
        ans = integral * ((b - a) / float(steps))
    
        return ans
    
    
    def mcIntegrate(steps):
        areas = []
        p = Pool(cpu_count())
        for ans in tqdm(p.imap(_mcIntegrate, ((steps) for _ in range(steps))), total=steps):
            areas.append(ans)
    
        return areas
    

    This, on my machine, is considerably faster. It's also considerably simpler. I was expecting a difference, but not such a considerable difference.

    Takeaways

    Multiprocessing isn't free. Something as simple as np.sin() is too cheap to multprocess: we pay to serialise, deserialise, append, and so on, all for one sin() calculation. But if you do too many calculations, you will waste time as you lose granularity. Here the effect is more striking than I was expecting. The only way to know the right level of granularity for a particular problem... is to profile and try.