I'm trying to optimize my spectral baseline fitting algorithm (based on this one), since, from my understanding, NumPy's apply_along_axis
performance is not significantly higher than simply looping through an array's indices.
It currently takes ~0.15 seconds to process a spectrum of ~4000 wavelengths, but applied to millions of spectra, it would take a while, to say the least. Am I missing an obvious vectorized solution, or is the algorithm simply that heavy? Example spectra file provided here, and its respective baseline here.
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
diagonal_weights: dia_array = diags_array(current_weights, dtype=np.float64, format="dia")
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
baseline[:] = spsolve(diagonal_weights + smoothness_penalies, current_weights * padded_spectrum, permc_spec="NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
# (M, N) spectra, where M is the number of signals acquired and N, the amount of wavelengths each signal contains
spectra = np.loadtxt("spectra.csv", dtype=np.float32, delimiter=",")
baseline = np.apply_along_axis(arpls, axis=1, arr=spectra)
np.savetxt("baseline.csv", baseline, delimiter=",")
I don't think there's anything that will make this wildly faster - it spends the majority of its time in spsolve()
, and that's already written in C. To make this much faster, you would somehow need to avoid solving this matrix, and I don't understand the math here well enough to think of an alternative which doesn't involve solving a matrix.
However, I did find two ideas that can make this around 36% faster.
The first thing I did was profile the program. I noticed that the program spent a lot of time on the spsolve line, so I broke that line up into multiple lines.
A = diagonal_weights + smoothness_penalies
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec="NATURAL")
Then I profiled this.
69 1203 569457223.0 473364.3 22.6 A = diagonal_weights + smoothness_penalies
70 1203 4759976.0 3956.8 0.2 b = current_weights * padded_spectrum
71 1203 1352957577.0 1e+06 53.7 baseline[:] = spsolve(A, b, permc_spec="NATURAL")
Spending 53.7% of its time in spsolve is hard to reduce, for the reasons I just mentioned. However, 22.6% of its time is also spent adding two matrices together, which seems crazy to me. I think this is some combination of overhead from converting DIA to CSC, and CSC + CSC addition being slow.
As an alternative, I tried identifying the indices of diagonal elements, the initial value of those elements, and then manipulating the diagonal elements directly using NumPy.
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
smoothness_penalies = smoothness_penalies.tocoo()
smoothness_penalies.sum_duplicates()
# smoothness_penalies must have canonical format, or changing to csr could change
# data order
assert smoothness_penalies.has_canonical_format
initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
initial_diag = smoothness_penalies.data[initial_diag_index]
smoothness_penalies = smoothness_penalies.tocsr()
assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
A = smoothness_penalies.copy()
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
A.data[initial_diag_index] = initial_diag + current_weights
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec="NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
This is roughly 26% faster.
Next, I looked at the creation of the smoothness_penalies
matrix. The key thing to notice about this matrix is that it depends on three things: the size of the spectrum, the smoothing factor, and the differentiation_order. These are the same for every call to arpls()
. Since arpls()
is called many times, this is a good candidate for memoization.
To do this, first move the creation of smoothness_penalies
into its own function. Next, apply the functools.lru_cache() decorator. This reduces the time spent setting up smoothness_penalies
from 16% of execution time to 0.2%.
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
from functools import lru_cache
@lru_cache
def initialize_smoothness_penalies(size, differentiation_order, smoothing_factor):
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(size, dtype=np.float64, format="csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
smoothness_penalies = smoothness_penalies.tocoo()
smoothness_penalies.sum_duplicates()
# smoothness_penalies must have canonical format, or changing to csr could change
# data order
assert smoothness_penalies.has_canonical_format
initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
initial_diag = smoothness_penalies.data[initial_diag_index]
smoothness_penalies = smoothness_penalies.tocsr()
assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
return smoothness_penalies, initial_diag, initial_diag_index
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
smoothness_penalies, initial_diag, initial_diag_index = initialize_smoothness_penalies(padded_spectrum.shape[0], differentiation_order, smoothing_factor)
A = smoothness_penalies.copy()
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
A.data[initial_diag_index] = initial_diag + current_weights
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec="NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
This includes the previous optimization, and is roughly 36% faster than the original implementation.
The results are within a factor of 10-7 of the original, which is driven by changing the input of A to spsolve from CSC to CSR.
You could re-write initialize_smoothness_penalies to provide the results in CSC, but this does make spsolve slower.
Here are a set of ideas that I investigated and couldn't get to work. You may have more luck.