Search code examples
pythonnumpyscipysignal-processingnumba

Numerically Integrating Signals with Absolute Value


Suppose I have a numpy s array of acceleration values representing some signal sampled at a fixed rate dt. I want to compute the cumulative absolute velocity, i.e. np.trapz(np.abs(s), dx=dt).

This is great except if dt is "large" (e.g. 0.01) and the signal s is both long and crossing between positive and negative values frequently (an unfortunately common occurrence), an error is accumulated from the fact that taking |s| drops the information about the original sign of s. See the picture for a better idea of what this error actually looks like.

I have some custom code that can correctly account for this error by creating a modified trapezium rule with numba, but there are other very similar functions that I need to implement doing things like np.trapz(np.square(s), dx=dt). Is there an off the shelf solution for this sort of numerical integral that:

  1. Can be reused for both integrals np.trapz(np.square(s), dx=dt) and np.trapz(np.abs(s), dx=dt), etc...
  2. Is ideally vectorised so that integration can be done for tens of thousands of signals at once in a reasonable time?

For the record, the following parallel numba code is what I am using to integrate the signals

@numba.njit(parallel=True)
def cav_integrate(
    waveform: npt.NDArray[np.float32], dt: float
) -> npt.NDArray[np.float32]:
    """Compute the Cumulative Absolute Velocity (CAV) of a waveform."""
    cav = np.zeros((waveform.shape[0],), dtype=np.float32)
    for i in range(waveform.shape[0]):
        for j in range(waveform.shape[1] - 1):
            if np.sign(waveform[i, j]) * np.sign(waveform[i, j + 1]) >= 0:
                cav[i] += dt / 2 * (np.abs(waveform[i, j]) + np.abs(waveform[i, j + 1]))
            else:
                slope = (waveform[i, j + 1] - waveform[i, j]) / dt
                x0 = -waveform[i, j] / slope
                cav[i] += x0 / 2 * np.abs(waveform[i, j]) + (dt - x0) / 2 * np.abs(
                    waveform[i, j + 1]
                )
    return cav

Example Data

I have uploaded a small broadband ground motion simulation to a dropbox link (approx. 91MiB) for testing. This data comes from a finite difference simulation of a recent earthquake near Wellington, New Zealand plus some empirically derived high-frequency noise. The file is an HDF5 containing some station data (irrelevant for our purposes), and simulation waveforms in the "waveforms" key. The array has shape (number of stations, timesteps, components) = (1433, 5876, 3). The 1d numpy array waveform[i, :, j] is the simulated acceleration for the ith station in the jth component. We need to compute the cumulative absolute velocity (CAV) for each component and each station independently. The benchmark code to do this can be found below:

import time

import h5py
import numba
import numpy as np
import numpy.typing as npt

broadband_input_file = h5py.File("broadband.h5", "r")
# Load the entire dataset into memory so that the first method is not arbitrarily slowed down by file I/O
waveforms = np.array(broadband_input_file["waveforms"])
dt = 0.01

start = time.process_time()
cav_naive = np.trapz(np.abs(waveforms), dx=dt, axis=1)
print(f"CAV naive time: {time.process_time() - start}")


@numba.njit
def cav_integrate(
    waveform: npt.NDArray[np.float32], dt: float
) -> npt.NDArray[np.float32]:
    """Compute the Cumulative Absolute Velocity (CAV) of a waveform."""
    cav = np.zeros((waveform.shape[0], waveform.shape[-1]), dtype=np.float32)
    for c in range(waveform.shape[-1]):
        for i in range(waveform.shape[0]):
            for j in range(waveform.shape[1] - 1):
                if np.sign(waveform[i, j, c]) * np.sign(waveform[i, j + 1, c]) >= 0:
                    cav[i, c] += (
                        dt
                        / 2
                        * (np.abs(waveform[i, j, c]) + np.abs(waveform[i, j + 1, c]))
                    )
                else:
                    slope = (waveform[i, j + 1, c] - waveform[i, j, c]) / dt
                    x0 = -waveform[i, j, c] / slope
                    cav[i, c] += x0 / 2 * np.abs(waveform[i, j, c]) + (
                        dt - x0
                    ) / 2 * np.abs(waveform[i, j + 1, c])
    return cav


# Warm up the numba compilation cache
_ = cav_integrate(waveforms, dt)

start = time.process_time()
cav_bespoke = cav_integrate(waveforms, dt)
print(f"Custom CAV time: {time.process_time() - start}")


print(
    f"Ratio naive CAV / custom CAV (0, 25, 50, 75, 100% quartiles): {np.percentile(cav_naive / cav_bespoke, [0, 25, 50, 75, 100])}"
)

Which gives the following output

CAV naive time: 0.14353649699999993
Custom CAV time: 0.11182449700000019
Ratio naive CAV / custom CAV (0, 25, 50, 75, 100% quartiles): [1.00607312 1.00999796 1.01163089 1.01318455 1.02221394]

These differences are reasonably small, better examples of larger differences are shown in the comments. Some of the observed waveforms have 20-40% differences between the methods. Even 2% differences might be important for some of the researchers I support. Note also that the CAV calculation is done on a single thread for comparison, but I would parallelise both methods in reality for the largest waveform arrays (having 6 or 7x the stations and 10-20x the timesteps depending on the temporal resolution of the simulation). Funnily enough the parallel overhead for this small file makes cav_integrate slower than the naive approach if enabled.

We actually do the CAV calculation for all linear combinations cos(theta) * waveform[i, :, 0] + sin(theta) * waveform[i, :, 1] where theta = 0, 1,...180° to obtain orientation independent measurements of CAV. This is part of the reason it needs to be fast.


Solution

  • This answer focus more on the performance/vectorization aspects than numerical integration.


    Faster implementation

    Is ideally vectorised so that integration can be done for tens of thousands of signals at once in a reasonable time?

    Technically, Numba code can run with njit (and without errors) are always vectorized based on the definition on Numpy (a vectorized function is basically a natively compiled function). However, it can be made faster. The first thing to do is to use multiple threads so the code can benefit from multiple CPU cores.

    Funnily enough the parallel overhead for this small file makes cav_integrate slower than the naive approach if enabled.

    This is because there is 2 issues:

    • process_time returns the sum of the system and user CPU time (i.e. amount of parallel work) of the current process. Its does not measures the wall clock time. Thus, the benchmark is biased. You should use time() to measure the wall clock time instead.
    • Numba don't automatically parallelize loops. It only parallelize some basic array operation but to parallelize loops, you need to use prange instead of range or otherwise the loop will be sequential (so the code of the question is not actually parallel).

    To efficiently parallelize the code, we should swap the i-based and c-based loops.

    Moreover, there are other things to consider when it comes to performance:

    • you should avoid updating cav[i, c] and accumulate values in a local variable instead (be careful to use it only within the parallel loop).
    • you should also be careful to avoid implicit 64-bit FP numbers conversions because they are often more expensive and certainly not needed here since you operate on 32-bit FP numbers. For example, dt is a 64-bit number so any operation involving it will results in a 64-bit number.
    • I think you can use a min-max check instead of a sign-product-based one (the former is more efficient and SIMD friendly)
    • you should use math tricks so to avoid some expensive mathematical computations like divisions (and avoiding repeated computation like multiplication by a constant for all terms of a sum)
    • adapt the code so to benefit from SIMD units and improve memory accesses (this can typically improve the scalability of the code): this is typically done by physically swapping the c and j axis (though this requires a different input layout).

    Here is the modified code considering all points except the last one (about SIMD and the memory layout):

    @numba.njit(parallel=True)
    def cav_integrate_opt(
        waveform: npt.NDArray[np.float32], dt: float
    ) -> npt.NDArray[np.float32]:
        """Compute the Cumulative Absolute Velocity (CAV) of a waveform."""
        cav = np.zeros((waveform.shape[0], waveform.shape[-1]), dtype=np.float32)
        dtf = np.float32(dt)
        half = np.float32(0.5)
        for i in numba.prange(waveform.shape[0]):
            for c in range(waveform.shape[-1]):
                tmp = np.float32(0)
                for j in range(waveform.shape[1] - 1):
                    v1 = waveform[i, j, c]
                    v2 = waveform[i, j + 1, c]
                    if min(v1, v2) >= 0 or max(v1, v2) <= 0:
                        tmp += dtf * (np.abs(v1) + np.abs(v2))
                    else:
                        inv_slope = dtf / (v2 - v1)
                        x0 = -v1 * inv_slope
                        tmp += x0 * np.abs(v1) + (dtf - x0) * np.abs(v2)
                cav[i, c] = tmp * half
        return cav
    

    Here are performance results on my AMD Ryzen 5700U CPU (with 8 cores):

    naive trapz (seq):              315 ms
    initial cav_integrate (seq):    244 ms
    optimized cav_integrate (par):   10 ms   <-----
    

    Th optimized implementation is 25 times faster than cav_integrate and 31 times faster than the naive approach.

    For better performance, please consider the last optimization point (more precisely about SIMD). That being said, this can be a bit complex to perform here. It might requires the else branch to be rarely executed (i.e. <5%) so to be pretty efficient.


    More generic integration

    Can be reused for both integrals np.trapz(np.square(s), dx=dt) and np.trapz(np.abs(s), dx=dt), etc...

    Here are some thoughts:

    For np.trapz(np.abs(s), dx=dt), a solution consists in computing the minimum value of the signal, then subtract the minimum to the signal, compute np.trapz of the resulting adapted signal so to finally correct the result. This solution is more efficient than you current one because it can benefit from SIMD instructions. However, it does not work for np.square.

    A generic solution is to add new points close to the problematic area (thanks to an interpolation function). This solution is not optimal because to increase the computational time and is not numerically exact either (though using a lot of point should give a pretty accurate solution). You do not need to interpolate all points nor to generate new array for the whole array : you can do that line by line or even on the fly (a bit more complicated). This can save a lot of RAM and computation time.

    Another generic solution is to pass a generic function in parameter to the numba function for computing differently the case where the sign change. However, this solution should be significantly slower than your specialized solution because it does not benefit from SIMD instructions and add an expensive function call that can hardly be inlined.

    You can mix the two last solution so to build a generic solution which should be still faster once running with multiple threads and optimized like above. The idea is to add just one point where the curve cross the line y=0 and and split the integration in two parts. A linear interpolation should give results similar to cav_integrate_opt (if not even equal). Here is an example:

    @numba.njit(parallel=True)
    def cav_integrate_opt_generic(waveform, dt, fun):
        cav = np.zeros((waveform.shape[0], waveform.shape[-1]), dtype=np.float32)
        dtf = np.float32(dt)
        half = np.float32(0.5)
        for i in numba.prange(waveform.shape[0]):
            for c in range(waveform.shape[-1]):
                tmp = np.float32(0)
                for j in range(waveform.shape[1] - 1):
                    v1 = waveform[i, j, c]
                    v2 = waveform[i, j + 1, c]
                    if min(v1, v2) < 0 and max(v1, v2) > 0:
                        # Basic linear interp
                        # Consider passing another generic function in 
                        # parameter to find roots if needed (more expensive).
                        inv_slope = dtf / (v2 - v1)
                        x0 = -v1 * inv_slope
                        tmp += x0 * fun(v1) + (dtf - x0) * fun(v2)
                    else:
                        tmp += dtf * (fun(v1) + fun(v2))
                cav[i, c] = tmp * half
        return cav
    
    # Needs to be wrapped in a Numba function for sake of performance
    # (so Numba can call it directly like a native C function)
    @numba.njit
    def numba_abs(y):
        return np.abs(y)
    
    # Note `cav_integrate_opt_generic` is recompiled for each different provided function.
    cav_bespoke = cav_integrate_opt_generic(waveforms, dt, numba_abs)
    

    If you want to do that using a higher-order interpolation and integration then you certainly need to consider more points and a generic function to find roots (which is certainly much more expensive when it is even possible to find analytical solutions).

    It turns out this more generic function is only 5~10% slower for np.abs on my machine. Result are the same for np.abs.