Search code examples
pythonnumpyoptimizationscipystatistics

Trying to optimize my ARPLS implementation


I'm trying to optimize my spectral baseline fitting algorithm (based on this one), since, from my understanding, NumPy's apply_along_axis performance is not significantly higher than simply looping through an array's indices.

It currently takes ~0.15 seconds to process a spectrum of ~4000 wavelengths, but applied to millions of spectra, it would take a while, to say the least. Am I missing an obvious vectorized solution, or is the algorithm simply that heavy? Example spectra file provided here, and its respective baseline here.

import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve


# Sparse array implementation
def arpls(
    spectrum: NDArray[np.floating],
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    differentiation_order: int = 2,
    max_iterations: int = 100,
    edge_padding: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.

    ### Parameters
    - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
    - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
        The larger the parameter, the smoother the resulting baseline.\
        High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
    - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
    - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.

    ### Returns
    - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Raises
    - `ValueError` -- If the ratio is not between 0 and 1.

    ### Reference
    - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
    # Step 2: Initialize the weights array.
    current_weights = np.ones_like(padded_spectrum)

    # Step 3: Initialize a difference array for enforcing smoothness
    differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
    for _ in range(differentiation_order):
        # Update the difference array to account for higher-order differentials if required
        differences = differences[1:] - differences[:-1]

    # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
    smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)

    # Step 5: Iteratively refine the estimated baseline
    baseline = np.zeros_like(padded_spectrum)
    for _ in range(max_iterations):
        # Step 5a: Generate a diagonal weights array from the current weights
        diagonal_weights: dia_array = diags_array(current_weights, dtype=np.float64, format="dia")
        # Step 5b: Solve for the baseline using the current weights and smoothness penalties
        baseline[:] = spsolve(diagonal_weights + smoothness_penalies, current_weights * padded_spectrum, permc_spec="NATURAL")

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5f: Check for convergence by measuring how much the weights have changed
        if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance
        current_weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

# (M, N) spectra, where M is the number of signals acquired and N, the amount of wavelengths each signal contains
spectra = np.loadtxt("spectra.csv", dtype=np.float32, delimiter=",")

baseline = np.apply_along_axis(arpls, axis=1, arr=spectra)
np.savetxt("baseline.csv", baseline, delimiter=",")

Solution

  • I don't think there's anything that will make this wildly faster - it spends the majority of its time in spsolve(), and that's already written in C. To make this much faster, you would somehow need to avoid solving this matrix, and I don't understand the math here well enough to think of an alternative which doesn't involve solving a matrix.

    However, I did find two ideas that can make this around 36% faster.

    • Faster diagonal manipulation
    • Caching the creation of the penalty matrix

    The first thing I did was profile the program. I noticed that the program spent a lot of time on the spsolve line, so I broke that line up into multiple lines.

    A = diagonal_weights + smoothness_penalies
    b = current_weights * padded_spectrum
    baseline[:] = spsolve(A, b, permc_spec="NATURAL")
    

    Then I profiled this.

        69      1203  569457223.0 473364.3     22.6          A = diagonal_weights + smoothness_penalies
        70      1203    4759976.0   3956.8      0.2          b = current_weights * padded_spectrum
        71      1203 1352957577.0    1e+06     53.7          baseline[:] = spsolve(A, b, permc_spec="NATURAL")
    

    Spending 53.7% of its time in spsolve is hard to reduce, for the reasons I just mentioned. However, 22.6% of its time is also spent adding two matrices together, which seems crazy to me. I think this is some combination of overhead from converting DIA to CSC, and CSC + CSC addition being slow.

    Faster diagonal manipulation

    As an alternative, I tried identifying the indices of diagonal elements, the initial value of those elements, and then manipulating the diagonal elements directly using NumPy.

    # Sparse array implementation
    def arpls(
        spectrum: NDArray[np.floating],
        smoothing_factor: float = 1E6,
        convergence_tolerance: float = 0.05,
        differentiation_order: int = 2,
        max_iterations: int = 100,
        edge_padding: int = 100,
    ) -> NDArray[np.floating]:
        """
        Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
    
        ### Parameters
        - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
        - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
            The larger the parameter, the smoother the resulting baseline.\
            High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
        - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
            Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
        - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
            An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
            Higher orders allow for more complex baseline shapes.
        - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
            The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
        - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
            Padding is done using edge values to minimize boundary effects.
    
        ### Returns
        - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
    
        ### Raises
        - `ValueError` -- If the ratio is not between 0 and 1.
    
        ### Reference
        - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
    
        ### Notes
        The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
        The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
        """
    
        # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
        padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
        # Step 2: Initialize the weights array.
        current_weights = np.ones_like(padded_spectrum)
    
        # Step 3: Initialize a difference array for enforcing smoothness
        differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
        for _ in range(differentiation_order):
            # Update the difference array to account for higher-order differentials if required
            differences = differences[1:] - differences[:-1]
    
        # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
        smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
        smoothness_penalies = smoothness_penalies.tocoo()
        smoothness_penalies.sum_duplicates()
        # smoothness_penalies must have canonical format, or changing to csr could change
        # data order
        assert smoothness_penalies.has_canonical_format
        initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
        
        initial_diag = smoothness_penalies.data[initial_diag_index]
        smoothness_penalies = smoothness_penalies.tocsr()
        assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
        A = smoothness_penalies.copy()
    
        # Step 5: Iteratively refine the estimated baseline
        baseline = np.zeros_like(padded_spectrum)
        for _ in range(max_iterations):
            # Step 5a: Generate a diagonal weights array from the current weights
            A.data[initial_diag_index] = initial_diag + current_weights
            # Step 5b: Solve for the baseline using the current weights and smoothness penalties
            b = current_weights * padded_spectrum
            baseline[:] = spsolve(A, b, permc_spec="NATURAL")
    
            # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
            residuals = padded_spectrum - baseline
            # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
            negative_residuals = residuals[residuals < 0]
            nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
            nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
    
            # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
            # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
            exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
            exponents.clip(-500, 500, out=exponents)
            updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
    
            # Step 5f: Check for convergence by measuring how much the weights have changed
            if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
                break # Stop iterating if the change in weights is below the specified tolerance
            current_weights[:] = updated_weights
    
        # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
        return baseline[edge_padding:-edge_padding]
    

    This is roughly 26% faster.

    Caching the creation of the penalty matrix

    Next, I looked at the creation of the smoothness_penalies matrix. The key thing to notice about this matrix is that it depends on three things: the size of the spectrum, the smoothing factor, and the differentiation_order. These are the same for every call to arpls(). Since arpls() is called many times, this is a good candidate for memoization.

    To do this, first move the creation of smoothness_penalies into its own function. Next, apply the functools.lru_cache() decorator. This reduces the time spent setting up smoothness_penalies from 16% of execution time to 0.2%.

    import numpy as np
    from numpy.typing import NDArray
    from scipy.linalg import norm
    from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
    from scipy.sparse.linalg import spsolve
    from functools import lru_cache
    
    
    @lru_cache
    def initialize_smoothness_penalies(size, differentiation_order, smoothing_factor):
        # Step 3: Initialize a difference array for enforcing smoothness
        differences: csr_array = eye_array(size, dtype=np.float64, format="csr")
        for _ in range(differentiation_order):
            # Update the difference array to account for higher-order differentials if required
            differences = differences[1:] - differences[:-1]
    
        # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
        smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
        smoothness_penalies = smoothness_penalies.tocoo()
        smoothness_penalies.sum_duplicates()
        # smoothness_penalies must have canonical format, or changing to csr could change
        # data order
        assert smoothness_penalies.has_canonical_format
        initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
        
        initial_diag = smoothness_penalies.data[initial_diag_index]
        smoothness_penalies = smoothness_penalies.tocsr()
        assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
        return smoothness_penalies, initial_diag, initial_diag_index
    
    
    # Sparse array implementation
    def arpls(
        spectrum: NDArray[np.floating],
        smoothing_factor: float = 1E6,
        convergence_tolerance: float = 0.05,
        differentiation_order: int = 2,
        max_iterations: int = 100,
        edge_padding: int = 100,
    ) -> NDArray[np.floating]:
        """
        Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
    
        ### Parameters
        - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
        - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
            The larger the parameter, the smoother the resulting baseline.\
            High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
        - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
            Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
        - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
            An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
            Higher orders allow for more complex baseline shapes.
        - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
            The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
        - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
            Padding is done using edge values to minimize boundary effects.
    
        ### Returns
        - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
    
        ### Raises
        - `ValueError` -- If the ratio is not between 0 and 1.
    
        ### Reference
        - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
    
        ### Notes
        The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
        The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
        """
    
        # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
        padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
        # Step 2: Initialize the weights array.
        current_weights = np.ones_like(padded_spectrum)
        smoothness_penalies, initial_diag, initial_diag_index = initialize_smoothness_penalies(padded_spectrum.shape[0], differentiation_order, smoothing_factor)
    
        A = smoothness_penalies.copy()
    
        # Step 5: Iteratively refine the estimated baseline
        baseline = np.zeros_like(padded_spectrum)
        for _ in range(max_iterations):
            # Step 5a: Generate a diagonal weights array from the current weights
            A.data[initial_diag_index] = initial_diag + current_weights
            # Step 5b: Solve for the baseline using the current weights and smoothness penalties
            b = current_weights * padded_spectrum
            baseline[:] = spsolve(A, b, permc_spec="NATURAL")
    
            # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
            residuals = padded_spectrum - baseline
            # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
            negative_residuals = residuals[residuals < 0]
            nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
            nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
    
            # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
            # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
            exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
            exponents.clip(-500, 500, out=exponents)
            updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
    
            # Step 5f: Check for convergence by measuring how much the weights have changed
            if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
                break # Stop iterating if the change in weights is below the specified tolerance
            current_weights[:] = updated_weights
    
        # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
        return baseline[edge_padding:-edge_padding]
    

    This includes the previous optimization, and is roughly 36% faster than the original implementation.

    Comparison to original results

    The results are within a factor of 10-7 of the original, which is driven by changing the input of A to spsolve from CSC to CSR.

    You could re-write initialize_smoothness_penalies to provide the results in CSC, but this does make spsolve slower.

    Further work

    Here are a set of ideas that I investigated and couldn't get to work. You may have more luck.

    • Iterative solvers: spsolve is a direct solver, but scipy has a number of iterative solvers. In theory, if you could identify a good initial guess for x0, like the baseline from the last iteration, then you could get the answer in very few iterations. In practice I could not get this to be faster than the direct solver. The best iterative solver that I found was cg, which was still slower.
    • umfpack: There is a note in the SciPy documentation that implies that umfpack based spsolve is faster than SciPy's solution. However, this depends on the scikit-umfpack package, which I couldn't get to install.