Search code examples
pythonpandasnumpyvectorization

Arnaud Legoux Moving Average (ALMA) in NumPy


I'd like to write the vectored version of code that calculates Arnaud Legoux Moving Average (ALMA) using NumPy (or Pandas). Could you help me with this, please? Thanks.

Non-vectored version looks like following (see below).

def NPALMA(pnp_array, **kwargs) :
    '''
    ALMA - Arnaud Legoux Moving Average,
    http://www.financial-hacker.com/trend-delusion-or-reality/
    https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
    '''
    length = kwargs['length']
    # just some number (6.0 is useful)
    sigma = kwargs['sigma']
    # sensisitivity (close to 1) or smoothness (close to 0)
    offset = kwargs['offset']

    asize = length - 1
    m = offset * asize
    s = length  / sigma
    dss = 2 * s * s
    
    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]

    return alma

Solution

  • Starting Approach

    We can create sliding windows along the first axis and then use tensor multiplication with the range of wtd values for the sum-reductions.

    The implementation would look something like this -

    # Get all wtd values in an array
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    
    # Get the sliding windows for input array along first axis
    pnp_array3D = strided_axis0(pnp_array,len(wtds))
    
    # Initialize o/p array
    out = np.zeros(pnp_array.shape)
    
    # Get sum-reductions for the windows which don't need wrapping over
    out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
    
    # Last element of the output needed wrapping. So, do it separately.
    out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
    
    # Finally perform the divisions
    out /= wtds.sum()
    

    Function to get the sliding windows : strided_axis0 is from here.

    Boost with 1D convolution

    Those multiplications with wtds values and then their sum-reductions are basically convolution along the first axis. As such, we can use scipy.ndimage.convolve1d along axis=0. This would be much faster given the memory efficiency, as we won't be creating huge sliding windows.

    The implementation would be -

    from scipy.ndimage import convolve1d as conv
    
    avgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
    

    Thus, out[length-1:], which are the non-zero rows would be same as avgs[:-length+1].

    There could be some precision difference if we are working with really small kernel numbers from wtds. So, keep that in mind if using this convolution method.

    Runtime test

    Approaches -

    def original_app(pnp_array, length, m, dss):
        alma = np.zeros(pnp_array.shape)
        wtd_sum = np.zeros(pnp_array.shape)
    
        for l in range(len(pnp_array)):
            if l >= asize:
                for i in range(length):
                    im = i - m
                    wtd = np.exp( -(im * im) / dss)
                    alma[l] += pnp_array[l - length + i] * wtd
                    wtd_sum[l] += wtd
                alma[l] = alma[l] / wtd_sum[l]
        return alma
    
    def vectorized_app1(pnp_array, length, m, dss):
        wtds = np.exp(-(np.arange(length) - m)**2/dss)
        pnp_array3D = strided_axis0(pnp_array,len(wtds))
        out = np.zeros(pnp_array.shape)
        out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
        out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
        out /= wtds.sum()
        return out
    
    def vectorized_app2(pnp_array, length, m, dss):
        wtds = np.exp(-(np.arange(length) - m)**2/dss)
        return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
    

    Timings -

    In [470]: np.random.seed(0)
         ...: m,n = 1000,100
         ...: pnp_array = np.random.rand(m,n)
         ...: 
         ...: length = 6
         ...: sigma = 0.3
         ...: offset = 0.5
         ...: 
         ...: asize = length - 1
         ...: m = np.floor(offset * asize)
         ...: s = length  / sigma
         ...: dss = 2 * s * s
         ...: 
    
    In [471]: %timeit original_app(pnp_array, length, m, dss)
         ...: %timeit vectorized_app1(pnp_array, length, m, dss)
         ...: %timeit vectorized_app2(pnp_array, length, m, dss)
         ...: 
    10 loops, best of 3: 36.1 ms per loop
    1000 loops, best of 3: 1.84 ms per loop
    1000 loops, best of 3: 684 µs per loop
    
    In [472]: np.random.seed(0)
         ...: m,n = 10000,1000 # rest same as previous one
    
    In [473]: %timeit original_app(pnp_array, length, m, dss)
         ...: %timeit vectorized_app1(pnp_array, length, m, dss)
         ...: %timeit vectorized_app2(pnp_array, length, m, dss)
         ...: 
    1 loop, best of 3: 503 ms per loop
    1 loop, best of 3: 222 ms per loop
    10 loops, best of 3: 106 ms per loop