Search code examples
pythonnumpysparse-matrixconvolutionrolling-computation

Non-uniform domain moving average using sparse matrices


Moving average with non-uniform domain works fine:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def convolve(s, f):
    """Compute the convolution of series S with a universal function F
    (see https://numpy.org/doc/stable/reference/ufuncs.html).
    This amounts to a moving average of S with weights F based on S.index."""
    index_v = s.index.values
    weight_mx = f(index_v - index_v[:, np.newaxis])
    weighted_sum = np.sum(s.values[:, np.newaxis] * weight_mx, axis=0)
    normalization = np.sum(weight_mx, axis=0)
    return pd.Series(weighted_sum/normalization, index=s.index)

size = 1000
df = pd.DataFrame({"x":np.random.normal(size=size)},
                  index=np.random.exponential(size=size)).sort_index()
def f(x):
    return np.exp(-x*x*30)
df["avg"] = convolve(df.x, f)
plt.scatter(df.index, df.avg, s=1, label="average")
plt.scatter(df.index, df.x, s=1, label="random")
plt.title("Moving Average for random data")
plt.legend()

Moving Average for random data

However, this allocates a O(size^3) array:

MemoryError: Unable to allocate 22.0 TiB for an array with shape (14454, 14454, 14454) and data type float64

Is it possible to "sparsify" the function convolve?

Specifically, f normally returns non-0 for a fairly narrow band of values.


Solution

  • weighted_sum = np.sum(s.values[:, np.newaxis] * weight_mx, axis=0)
    

    is equivalent to

    weighted_sum = weight_mx @ s.values
    

    which is computed without creating the large intermediate array.

    The proof is in the pudding

    enter image description here

    import numpy as np
    import pandas as pd
    import matplotlib.pyplot as plt
    
    def convolve(s, f):
        """Compute the convolution of series S with a universal function F
        (see https://numpy.org/doc/stable/reference/ufuncs.html).
        This amounts to a moving average of S with weights F based on S.index."""
        index_v = s.index.values
        weight_mx = f(index_v - index_v[:, np.newaxis])
        weighted_sum = weight_mx @ s.values
        normalization = np.sum(weight_mx, axis=0)
        return pd.Series(weighted_sum/normalization, index=s.index)
    
    size = 20_000 # 1000
    df = pd.DataFrame({"x":np.random.normal(size=size)},
                      index=np.random.exponential(size=size)).sort_index()
    def f(x):
        return np.exp(-x*x*30)
    df["avg"] = convolve(df.x, f)
    plt.scatter(df.index, df.avg, s=1, label="average")
    plt.scatter(df.index, df.x, s=1, label="random")
    plt.title("Moving Average for random data")
    plt.legend()