Search code examples
pythonnumpydata-analysis

Faster Way to Implement Gaussian Smoothing? (Python 3.10, NumPy)


I'm attempting to implement a Gaussian smoothing/flattening function in my Python 3.10 script to flatten a set of XY-points. For each data point, I'm creating a Y buffer and a Gaussian kernel, which I use to flatten each one of the Y-points based on it's neighbours.

Here are some sources on the Gaussian-smoothing method:

I'm using the NumPy module for my data arrays, and MatPlotLib to plot the data.

I wrote a minimal reproducible example, with some randomly-generated data, and each one of the arguments needed for the Gaussian function listed at the top of the main function:

import numpy as np
import matplotlib.pyplot as plt
import time

def main():
    dataSize = 1000
    yDataRange = [-4, 4]
    reachPercentage = 0.1
    sigma = 10
    phi = 0
    amplitude = 1

    testXData = np.arange(stop = dataSize)
    testYData = np.random.uniform(low = yDataRange[0], high = yDataRange[1], size = dataSize)

    print("Flattening...")
    startTime = time.time()

    flattenedYData = GaussianFlattenData(testXData, testYData, reachPercentage, sigma, phi, amplitude)

    totalTime = round(time.time() - startTime, 2)
    print("Flattened! (" + str(totalTime) + " sec)")

    plt.title(str(totalTime) + " sec")
    plt.plot(testXData, testYData, label = "Original Data")
    plt.plot(testXData, flattenedYData, label = "Flattened Data")
    plt.legend()

    plt.show()

    plt.close()

def GaussianFlattenData(xData, yData, reachPercentage, sigma, phi, amplitude):
    flattenedYData = np.empty(shape = len(xData), dtype = float)

    # For each data point, create a Y buffer and a Gaussian kernel, and flatten it based on it's neighbours
    for i in range(len(xData)):
        gaussianCenter = xData[i]
        baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
        reachEdgeIndices = [FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[0]), xData)), 
                            FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))]
        currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]

        # Creating Y buffer and Gaussian kernel...
        currYPoints = np.empty(shape = currDataScanNum, dtype = float)
        kernel = np.empty(shape = currDataScanNum, dtype = float)

        for j in range(currDataScanNum):
            currYPoints[j] = yData[j + reachEdgeIndices[1]]
            kernel[j] = GetGaussianValueY(j, (i - reachEdgeIndices[1]), sigma, phi, amplitude)

        # Dividing kernel by its sum...
        kernelSum = np.sum(kernel)

        for j in range(len(kernel)):
            kernel[j] = (kernel[j] / kernelSum)

        # Acquiring the current flattened Y point...
        newCurrYPoints = np.empty(shape = len(currYPoints), dtype = float)

        for j in range(len(currYPoints)):
            newCurrYPoints[j] = currYPoints[j] * kernel[j]

        flattenedYData[i] = np.sum(newCurrYPoints)

    return flattenedYData

def GetGaussianValueX(y, mu, sigma, phi, amplitude):
    x = ((sigma * np.sqrt(-2 * np.log(y / (amplitude * np.cos(phi))))) + mu)

    return [x, (mu - (x - mu))]

def GetGaussianValueY(x, mu, sigma, phi, amplitude):
    y = ((amplitude * np.cos(phi)) * np.exp(-np.power(((x - mu) / sigma), 2) / 2))

    return y

def GetClosestNum(base, nums):
    closestIdx = 0
    closestDiff = np.abs(base - nums[0])
    idx = 1

    while (idx < len(nums)):
        currDiff = np.abs(base - nums[idx])

        if (currDiff < closestDiff):
            closestDiff = currDiff
            closestIdx = idx

        idx += 1

    return nums[closestIdx]

def FindInArray(arr, value):
    for i in range(len(arr)):
        if (arr[i] == value):
            return i

    return -1

if (__name__ == "__main__"):
    main()

In the example above, I generate 1,000 random data points, between the ranges of -4 and 4. The reachPercentage variable is the percentage of the Gaussian amplitude above which the Gaussian values will be inserted into the kernel. The sigma, phi and amplitude variables are all inputs to the Gaussian function which will actually generate the Gaussians for each Y-data point to be smoothened.

I wrote some additional utility functions which I needed as well.

The script above works to smoothen the generated data, and I get the following plot:

Blue being the original data, and Orange being the flattened data.

However, it takes a surprisingly long amount of time to smoothen even smaller amounts of data. In the example above I generated 1,000 data points, and it takes ~8 seconds to flatten that. With datasets exceeding 10,000 in number, it can easily take over 10 minutes.

Since this is a very popular and known way of smoothening data, I was wondering why this script ran so slow. I originally had this implemented with standard Pythons Lists with calling append, however it was extremely slow. I hoped that using the NumPy arrays instead without calling the append function would make it faster, but that is not really the case.

Is there a way to speed up this process? Is there a Gaussian-smoothing function that already exists out there, that takes in the same arguments, and that could do the job faster?

Thanks for reading my post, any guidance is appreciated.


Solution

  • After asking people on the Python forums, as well as doing some more searching online, I managed to find much faster alternatives to most of the functions I had in my loop.

    In order to get a better image of which parts of the smoothing function took up the most time, I subdivided the code into 4 parts, and timed each one to see how much each part contributed to the total runtime. To my surprise, the part that took up over 90% of the time, was the first part of the loop:

            gaussianCenter = xData[i]
            baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
            reachEdgeIndices = [FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[0]), xData)), 
                                FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))]
            currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]
    

    Luckly, the people on the Python forums and here were able to assist me, and I was able find a much faster alternative GetClosestNum function (thanks Vin), as well as removing the FindInArray function:

    There are also replacements in the latter parts of the loop, where instead of having 3 for loops, they were all replaced my NumPy iterative expressions.

    The whole script now looks like this:

    import numpy as np
    import matplotlib.pyplot as plt
    import time
    
    def main():
        dataSize = 3073
        yDataRange = [-4, 4]
        reachPercentage = 0.001
        sigma = 100
        phi = 0
        amplitude = 1
    
        testXData = np.arange(stop = dataSize)
        testYData = np.random.uniform(low = yDataRange[0], high = yDataRange[1], size = dataSize)
    
        print("Flattening...")
        startTime = time.time()
    
        flattenedYData = GaussianFlattenData(testXData, testYData, reachPercentage, sigma, phi, amplitude)
    
        totalTime = round(time.time() - startTime, 2)
        print("Flattened! (" + str(totalTime) + " sec)")
    
        plt.title(str(totalTime) + " sec")
        plt.plot(testXData, testYData, label = "Original Data")
        plt.plot(testXData, flattenedYData, label = "Flattened Data")
        plt.legend()
    
        plt.show()
    
        plt.close()
    
    def GaussianFlattenData(xData, yData, reachPercentage, sigma, phi, amplitude):
        flattenedYData = np.empty(shape = len(xData), dtype = float)
    
        # For each data point, create a Y buffer and a Gaussian kernel, and flatten it based on it's neighbours
        for i in range(len(xData)):
            gaussianCenter = xData[i]
            baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
            reachEdgeIndices = [np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[0]), xData))[0][0], 
                                np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))[0][0]]
            currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]
    
            # Creating Y buffer and Gaussian kernel...
            currYPoints = yData[reachEdgeIndices[1] : reachEdgeIndices[1] + currDataScanNum]
            kernel = GetGaussianValueY(np.arange(currDataScanNum), (i - reachEdgeIndices[1]), sigma, phi, amplitude)
    
            # Acquiring the current flattened Y point...
            flattenedYData[i] = np.sum(currYPoints * (kernel / np.sum(kernel)))
    
        return flattenedYData
    
    def GetGaussianValueX(y, mu, sigma, phi, amplitude):
        x = ((sigma * np.sqrt(-2 * np.log(y / (amplitude * np.cos(phi))))) + mu)
    
        return [x, (mu - (x - mu))]
    
    def GetGaussianValueY(x, mu, sigma, phi, amplitude):
        y = ((amplitude * np.cos(phi)) * np.exp(-np.power(((x - mu) / sigma), 2) / 2))
    
        return y
    
    def GetClosestNum(base, nums):
        nums = np.asarray(nums)
    
        return nums[(np.abs(nums - base)).argmin()]
    
    if (__name__ == "__main__"):
        main()
    

    Instead of taking ~8 seconds to process the 1,000 data points, it now takes merely ~0.15 seconds!

    enter image description here

    It also takes ~1.75 seconds to process the 10,000 points.

    Thanks for the feedback everyone, cheers!