I'm attempting to implement a Gaussian smoothing/flattening function in my Python 3.10
script to flatten a set of XY-points. For each data point, I'm creating a Y buffer and a Gaussian kernel, which I use to flatten each one of the Y-points based on it's neighbours.
Here are some sources on the Gaussian-smoothing method:
I'm using the NumPy
module for my data arrays, and MatPlotLib
to plot the data.
I wrote a minimal reproducible example, with some randomly-generated data, and each one of the arguments needed for the Gaussian function listed at the top of the main
function:
import numpy as np
import matplotlib.pyplot as plt
import time
def main():
dataSize = 1000
yDataRange = [-4, 4]
reachPercentage = 0.1
sigma = 10
phi = 0
amplitude = 1
testXData = np.arange(stop = dataSize)
testYData = np.random.uniform(low = yDataRange[0], high = yDataRange[1], size = dataSize)
print("Flattening...")
startTime = time.time()
flattenedYData = GaussianFlattenData(testXData, testYData, reachPercentage, sigma, phi, amplitude)
totalTime = round(time.time() - startTime, 2)
print("Flattened! (" + str(totalTime) + " sec)")
plt.title(str(totalTime) + " sec")
plt.plot(testXData, testYData, label = "Original Data")
plt.plot(testXData, flattenedYData, label = "Flattened Data")
plt.legend()
plt.show()
plt.close()
def GaussianFlattenData(xData, yData, reachPercentage, sigma, phi, amplitude):
flattenedYData = np.empty(shape = len(xData), dtype = float)
# For each data point, create a Y buffer and a Gaussian kernel, and flatten it based on it's neighbours
for i in range(len(xData)):
gaussianCenter = xData[i]
baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
reachEdgeIndices = [FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[0]), xData)),
FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))]
currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]
# Creating Y buffer and Gaussian kernel...
currYPoints = np.empty(shape = currDataScanNum, dtype = float)
kernel = np.empty(shape = currDataScanNum, dtype = float)
for j in range(currDataScanNum):
currYPoints[j] = yData[j + reachEdgeIndices[1]]
kernel[j] = GetGaussianValueY(j, (i - reachEdgeIndices[1]), sigma, phi, amplitude)
# Dividing kernel by its sum...
kernelSum = np.sum(kernel)
for j in range(len(kernel)):
kernel[j] = (kernel[j] / kernelSum)
# Acquiring the current flattened Y point...
newCurrYPoints = np.empty(shape = len(currYPoints), dtype = float)
for j in range(len(currYPoints)):
newCurrYPoints[j] = currYPoints[j] * kernel[j]
flattenedYData[i] = np.sum(newCurrYPoints)
return flattenedYData
def GetGaussianValueX(y, mu, sigma, phi, amplitude):
x = ((sigma * np.sqrt(-2 * np.log(y / (amplitude * np.cos(phi))))) + mu)
return [x, (mu - (x - mu))]
def GetGaussianValueY(x, mu, sigma, phi, amplitude):
y = ((amplitude * np.cos(phi)) * np.exp(-np.power(((x - mu) / sigma), 2) / 2))
return y
def GetClosestNum(base, nums):
closestIdx = 0
closestDiff = np.abs(base - nums[0])
idx = 1
while (idx < len(nums)):
currDiff = np.abs(base - nums[idx])
if (currDiff < closestDiff):
closestDiff = currDiff
closestIdx = idx
idx += 1
return nums[closestIdx]
def FindInArray(arr, value):
for i in range(len(arr)):
if (arr[i] == value):
return i
return -1
if (__name__ == "__main__"):
main()
In the example above, I generate 1,000 random data points, between the ranges of -4 and 4. The reachPercentage
variable is the percentage of the Gaussian amplitude above which the Gaussian values will be inserted into the kernel. The sigma
, phi
and amplitude
variables are all inputs to the Gaussian function which will actually generate the Gaussians for each Y-data point to be smoothened.
I wrote some additional utility functions which I needed as well.
The script above works to smoothen the generated data, and I get the following plot:
Blue being the original data, and Orange being the flattened data.
However, it takes a surprisingly long amount of time to smoothen even smaller amounts of data. In the example above I generated 1,000 data points, and it takes ~8 seconds to flatten that. With datasets exceeding 10,000 in number, it can easily take over 10 minutes.
Since this is a very popular and known way of smoothening data, I was wondering why this script ran so slow. I originally had this implemented with standard Pythons Lists
with calling append
, however it was extremely slow. I hoped that using the NumPy
arrays instead without calling the append
function would make it faster, but that is not really the case.
Is there a way to speed up this process? Is there a Gaussian-smoothing function that already exists out there, that takes in the same arguments, and that could do the job faster?
Thanks for reading my post, any guidance is appreciated.
After asking people on the Python forums, as well as doing some more searching online, I managed to find much faster alternatives to most of the functions I had in my loop.
In order to get a better image of which parts of the smoothing function took up the most time, I subdivided the code into 4 parts, and timed each one to see how much each part contributed to the total runtime. To my surprise, the part that took up over 90% of the time, was the first part of the loop:
gaussianCenter = xData[i]
baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
reachEdgeIndices = [FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[0]), xData)),
FindInArray(xData, GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))]
currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]
Luckly, the people on the Python forums and here were able to assist me, and I was able find a much faster alternative GetClosestNum
function (thanks Vin), as well as removing the FindInArray
function:
There are also replacements in the latter parts of the loop, where instead of having 3 for
loops, they were all replaced my NumPy
iterative expressions.
The whole script now looks like this:
import numpy as np
import matplotlib.pyplot as plt
import time
def main():
dataSize = 3073
yDataRange = [-4, 4]
reachPercentage = 0.001
sigma = 100
phi = 0
amplitude = 1
testXData = np.arange(stop = dataSize)
testYData = np.random.uniform(low = yDataRange[0], high = yDataRange[1], size = dataSize)
print("Flattening...")
startTime = time.time()
flattenedYData = GaussianFlattenData(testXData, testYData, reachPercentage, sigma, phi, amplitude)
totalTime = round(time.time() - startTime, 2)
print("Flattened! (" + str(totalTime) + " sec)")
plt.title(str(totalTime) + " sec")
plt.plot(testXData, testYData, label = "Original Data")
plt.plot(testXData, flattenedYData, label = "Flattened Data")
plt.legend()
plt.show()
plt.close()
def GaussianFlattenData(xData, yData, reachPercentage, sigma, phi, amplitude):
flattenedYData = np.empty(shape = len(xData), dtype = float)
# For each data point, create a Y buffer and a Gaussian kernel, and flatten it based on it's neighbours
for i in range(len(xData)):
gaussianCenter = xData[i]
baseReachEdges = GetGaussianValueX((GetGaussianValueY(0, 0, sigma, phi, amplitude) * reachPercentage), 0, sigma, phi, amplitude)
reachEdgeIndices = [np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[0]), xData))[0][0],
np.where(xData == GetClosestNum((gaussianCenter + baseReachEdges[1]), xData))[0][0]]
currDataScanNum = reachEdgeIndices[0] - reachEdgeIndices[1]
# Creating Y buffer and Gaussian kernel...
currYPoints = yData[reachEdgeIndices[1] : reachEdgeIndices[1] + currDataScanNum]
kernel = GetGaussianValueY(np.arange(currDataScanNum), (i - reachEdgeIndices[1]), sigma, phi, amplitude)
# Acquiring the current flattened Y point...
flattenedYData[i] = np.sum(currYPoints * (kernel / np.sum(kernel)))
return flattenedYData
def GetGaussianValueX(y, mu, sigma, phi, amplitude):
x = ((sigma * np.sqrt(-2 * np.log(y / (amplitude * np.cos(phi))))) + mu)
return [x, (mu - (x - mu))]
def GetGaussianValueY(x, mu, sigma, phi, amplitude):
y = ((amplitude * np.cos(phi)) * np.exp(-np.power(((x - mu) / sigma), 2) / 2))
return y
def GetClosestNum(base, nums):
nums = np.asarray(nums)
return nums[(np.abs(nums - base)).argmin()]
if (__name__ == "__main__"):
main()
Instead of taking ~8 seconds to process the 1,000 data points, it now takes merely ~0.15 seconds!
It also takes ~1.75 seconds to process the 10,000 points.
Thanks for the feedback everyone, cheers!