Search code examples
pythonperformancenumpyscipyprobability

Speeding up normal distribution probability mass allocation


We have N users with P avg. points per user, where each point is a single value between 0 and 1. We need to distribute the mass of each point using a normal distribution with a known density of 0.05 as the points have some uncertainty. Additionally, we need to wrap the mass around 0 and 1 such that e.g. a point at 0.95 will also allocate mass around 0. I've provided a working example below, which bins the normal distribution into D=50 bins. The example uses the Python typing module, but you can ignore that if you'd like.

from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt

D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()


def probability_mass(distribution: Any, x0: float, x1: float) -> float:
    """
    Computes the area under the distribution, wrapping at 1.
    The wrapping is done by adding the PDF at +- 1.
    """
    assert x1 > x0
    return (
        (distribution.cdf(x1) - distribution.cdf(x0))
        + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
        + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
    )


def point_density(x: float) -> List[float]:
    distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
    density: List[float] = []
    for i in range(D):
        density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
    return density


def user_density(points: List[float]) -> Any:

    # Find the density of each point
    density: Any = np.array([point_density(p) for p in points])

    # Combine points and normalize
    combined = density.sum(axis=0)
    return combined / combined.sum()


if __name__ == "__main__":

    # Example for one user
    data: List[float] = [.05, .3, .5, .5]
    density = user_density(data)

    # Example for multiple users (N = 2)
    print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])

    ### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
    ### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
    middle: List[float] = []
    for i in range(D):
        middle.append((BINS[i] + BINS[i + 1]) / 2)
    plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
    plt.xlim(0, 1)
    plt.xlabel("x")
    plt.ylabel("Density")
    plt.show()

matplotlib output

In this example N=1, D=50, P=4. However, we want to scale this approach to N=10000 and P=100 while being as fast as possible. It's unclear to me how we'd vectorize this approach. How do we best speed up this?

EDIT

The faster solution can have slightly different results. For instance, it could approximate the normal distribution instead of using the precise normal distribution.

EDIT2

We only care about computing density using the user_density() function. The plot is only to help explain the approach. We do not care about the plot itself :)

EDIT3

Note that P is the avg. points per user. Some users may have more and some may have less. If it helps, you can assume that we can throw away points such that all users have a max of 2 * P points. It's fine to ignore this part while benchmarking as long as the solution can handle a flexible # of points per user.


Solution

  • You could get below 50ms for largest case (N=10000, AVG[P]=100, D=50) by using using FFT and creating data in numpy friendly format. Otherwise it will be closer to 300 msec.

    The idea is to convolve a single normal distribution centered at 0 with a series Dirac deltas.

    See image below: enter image description here

    Using circular convolution solves two issues.

    • naturally deals with wrapping at the edges
    • can be efficiently computed with FFT and Convolution Theorem

    First one must create a distribution to be copied. Function mk_bell() created a histogram of a normal distribution of stddev 0.05 centered at 0. The distribution wraps around 1. One could use arbitrary distribution here. The spectrum of the distribution is computed are used for fast convolution.

    Next a comb-like function is created. The peaks are placed at indices corresponding to peaks in user density. E.g.

    peaks_location = [0.1, 0.3, 0.7]
    D = 10
    

    maps to

    peak_index = (D * peak_location).astype(int) = [1, 3, 7]
    dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]
    

    You can quickly create a composition of Diract Deltas by computing indices of the bins for each peak location with help of np.bincount() function. In order to speed things even more one can compute comb-functions for user-peaks in parallel.

    Array dist is 2D-array of shape NxD. It can be linearized to 1D array of shape (N*D). After this change element on position [user_id, peak_index] will be accessible from index user_id*D + peak_index. With numpy-friendly input format (described below) this operation is easily vectorized.

    The convolution theorem says that spectrum of convolution of two signals is equal to product of spectrums of each signal.

    The spectrum is compute with numpy.fft.rfft which is a variant of Fast Fourier Transfrom dedicated to real-only signals (no imaginary part).

    Numpy allows to compute FFT of each row of the larger matrix with one command.

    Next, the spectrum of convolution is computed by simple multiplication and use of broadcasting.

    Next, the spectrum is computed back to "time" domain by Inverse Fourier Transform implemented in numpy.fft.irfft.

    To use the full speed of numpy one should avoid variable size data structure and keep to fixed size arrays. I propose to represent input data as three arrays.

    • uids the identifier for user, integer 0..N-1
    • peaks, the location of the peak
    • mass, the mass of the peek, currently it is 1/numer-of-peaks-for-user

    This representation of data allows quick vectorized processing. Eg:

    user_data = [[0.1, 0.3], [0.5]]
    

    maps to:

    uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
    peaks = [0.1, 0.3, 0.5] # serialized user_data
    mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0
    

    The code:

    import numpy as np
    import matplotlib.pyplot as plt
    import time
    
    def mk_bell(D, SIGMA):
        # computes normal distribution wrapped and centered at zero
        x = np.linspace(0, 1, D, endpoint=False);
        x = (x + 0.5) % 1 - 0.5
        bell = np.exp(-0.5*np.square(x / SIGMA))
        return bell / bell.sum()
    
    def user_densities_by_fft(uids, peaks, mass, D, N=None):
        bell = mk_bell(D, 0.05).astype('f4')
        sbell = np.fft.rfft(bell)
        if N is None:
            N = uids.max() + 1
        # ensure that peaks are in [0..1) internal
        peaks = peaks - np.floor(peaks)
        # convert peak location from 0-1 to the indices
        pidx = (D * (peaks + uids)).astype('i4')
        dist = np.bincount(pidx, mass, N * D).reshape(N, D)
        # process all users at once with Convolution Theorem
        sdist = np.fft.rfft(dist)
        sdist *= sbell
        res = np.fft.irfft(sdist)
    
        return res
    
    def generate_data(N, Pmean):
        # generateor for large data
        data = []
        for n in range(N):
            # select P uniformly from 1..2*Pmean
            P = np.random.randint(2 * Pmean) + 1
            # select peak locations
            chunk = np.random.uniform(size=P)
            data.append(chunk.tolist())
        return data
    
    def make_data_numpy_friendly(data):
        uids = []
        chunks = []
        mass = []
        for uid, peaks in enumerate(data):
            uids.append(np.full(len(peaks), uid))
            mass.append(np.full(len(peaks), 1 / len(peaks)))
            chunks.append(peaks)
        return np.hstack(uids), np.hstack(chunks), np.hstack(mass)
    
    
    
    
    D = 50
    
    # demo for simple multi-distribution
    data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
    uids, peaks, mass = make_data_numpy_friendly(data)
    dist = user_densities_by_fft(uids, peaks, mass, D, N)
    plt.plot(dist.T)
    plt.show()
    
    # the actual measurement
    N = 10000
    P = 100
    data = generate_data(N, P)
    
    tic = time.time()
    uids, peaks, mass = make_data_numpy_friendly(data)
    toc = time.time()
    print(f"make_data_numpy_friendly: {toc - tic}")
    
    tic = time.time()
    dist = user_densities_by_fft(uids, peaks, mass, D, N)
    toc = time.time()
    print(f"user_densities_by_fft: {toc - tic}")
    
    

    The results on my 4-core Haswell machine are:

    make_data_numpy_friendly: 0.2733159065246582
    user_densities_by_fft: 0.04064297676086426
    

    It took 40ms to process the data. Notice that processing data to numpy friendly format takes 6 times more time than the actual computation of distributions. Python is really slow when it comes to looping. Therefore I strongly recommend to generate input data directly in numpy-friendly way in the first place.

    There are some issues to be fixed:

    • precision, can be improved by using larger D and downsampling
    • accuracy of peak location could be further improved by widening the spikes.
    • performance, scipy.fft offers move variants of FFT implementation that may be faster