Search code examples
pythonpandasvectorizationaggregationstock

How to vectorize complex cumulative aggregation problem?


Dataset:

date time_index identifier value cum_value bar_index desired_output
2023-06-01 1 stackoverflow 5 5 NaN 0
2023-06-01 2 stackoverflow 10 15 NaN 0
2023-06-01 3 stackoverflow 10 25 NaN 1
2023-06-01 1 cross_validated 4 4 NaN 0
2023-06-01 2 cross_validated 6 10 NaN 0
2023-06-01 3 cross_validated 20 30 NaN 1
2023-06-01 4 cross_validated 5 35 NaN 2
2023-06-02 1 stackoverflow 2 2 NaN 0
2023-06-02 2 stackoverflow 10 12 NaN 0
2023-06-02 1 cross_validated 20 20 NaN 0
2023-06-02 2 cross_validated 3 23 NaN 1
2023-06-02 3 cross_validated 3 26 NaN 1

Code that generates the dataset:

import numpy as np
import pandas as pd

df = pd.DataFrame({
    "date": ["2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02"],
    "time_index": [1, 2, 3, 1, 2, 3, 4, 1, 2, 1, 2, 3],
    "identifier": ["stackoverflow", "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated", "cross_validated", 
               "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated"],
    "value": [5, 10, 10, 4, 6, 20, 5, 2, 10, 20, 3, 3]
})
df["cum_value"] = df.groupby(["identifier", "date"])["value"].cumsum()
df["bar_index"] = np.nan
df["desired_output"] = [0, 0, 1, 0, 0, 1, 2, 0, 0, 0, 1, 1]

I want to sample bar_index for each identifier and date according to a fixed (for now) threshold τ=10, using a column's value and/or cum_value.

  • τ = 10
  • date: 2023-06-01 = d1 & 2023-06-02 = d2
  • identifier: stackoverflow = id1 & cross_validated = id2
  • time_index ∈ {t1, t2,...,tn} ∀ d, id
  1. Observation {id1, d1, t1} has a value less than the threshold of 10 so we continue to the next entries. If we add the value of {id1, d1, t1} and {id1, d1, t2} together we reach a cum_value (cumulative value) of 15, which exceeds the threshold. Therefore we would sample {id1, d1, t1} as well as {id1, d1, t2} as bar_index 0.

  2. If we encounter an observation with a very large value (for example {id2, d1, t3}) and the previous bar ended (cumulative value exceeded the threshold from the last trade), we would sample this observation along as a bar_index. The next observation starts a new accumulation (in theory).

Current non-vectorized approach:

def aggregate_bars(group, threshold):
    cum_value = 0
    bar_index = 0

    for i in range(len(group)):
        cum_value += group.iloc[i]["value"]
        if cum_value >= threshold:
            group["bar_index"].iloc[i] = bar_index
            bar_index += 1
            cum_value = 0
        elif cum_value < threshold:
            group["bar_index"].iloc[i] = bar_index

    return group

df = df.groupby(["identifier", "date"]).apply(lambda x: aggregate_bars(x, 10))
df

Output:

date time_index identifier value cum_value bar_index desired_output
2023-06-01 1 stackoverflow 5 5 0.0 0
2023-06-01 2 stackoverflow 10 15 0.0 0
2023-06-01 3 stackoverflow 10 25 1.0 1
2023-06-01 1 cross_validated 4 4 0.0 0
2023-06-01 2 cross_validated 6 10 0.0 0
2023-06-01 3 cross_validated 20 30 1.0 1
2023-06-01 4 cross_validated 5 35 2.0 2
2023-06-02 1 stackoverflow 2 2 0.0 0
2023-06-02 2 stackoverflow 10 12 0.0 0
2023-06-02 1 cross_validated 20 20 0.0 0
2023-06-02 2 cross_validated 3 23 1.0 1
2023-06-02 3 cross_validated 3 26 1.0 1

How to vectorize the code so it can effectively process trillions of rows?


Solution

  • It is unlikely that your function can be vectorised exactly as stated. Your choice for element i depends directly on your choice for i-1, in a way that you cannot really shuffle around any computation. Depending on what your goal is with these values, it may be a good solution to find an alternative function that gets the job done and is vectorised more readily.

    That does not mean, however, that this function cannot be improved and sped up.

    Initial ideas: Binary search

    First, the cumsum operation can be vectorised, and doing so will remove a whole bunch of Python function calls, doing it all in C.

    Next, instead of doing a linear search for the next location where the bar overflows, we can make it a binary search. Numpy has an implementation for that.

    def _binary_search_get_bar_index(cumsum, threshold):
      bar_index = 0
      covered_count = 0
      covered_sum = 0
      result = np.zeros_like(cumsum)
      l = len(cumsum)
      while True:
        bar_last = np.searchsorted(cumsum, covered_sum + threshold)
        result[covered_count:min(bar_last + 1, l)] = bar_index
        bar_index += 1
        covered_count = bar_last + 1
        if covered_count >= l:
          break
        covered_sum = cumsum[bar_last]
      return result
    
    
    def binary_search(df, threshold):
      cumsum = df["value"].cumsum()
      df['bar_index'] = _binary_search_get_bar_index(cumsum.values, threshold)
      return df
    

    Numba

    Before jumping into benchmarking, I thought I'd also implement your original function (shuffled around a bit), and decorating it with numba.njit. This compiles machine code based on the python function. Almost as if pandas had it implemented for you in C.

    @njit
    def _numba_get_bar_index(cumsum, threshold):
      covered_sum = 0
      bar_index = 0
      result = np.zeros_like(cumsum)
      for i in range(len(cumsum)):
        result[i] = bar_index
        if cumsum[i] >= covered_sum + threshold:
          bar_index += 1
          covered_sum = cumsum[i]
      return result
    
    
    def numba_f(df, threshold):
      cumsum = df["value"].cumsum()
      df['bar_index'] = _numba_get_bar_index(cumsum.values, threshold)
      return df
    

    Setup

    I verified that both of these solutions give exactly the desired output on your dataset in the question.

    I benchmarked on a DataFrame with 100 000 rows, running on an M1 Pro Mac.

    The data was generated as below. I omitted irrelevant columns. I also omitted grouping as it's also not relevant to the question, your original function simply receives a DataFrame and only reads the values column.

    long_data = pd.DataFrame({'value': np.random.default_rng().poisson(8, 100000)})
    # long_data.head() -> [6, 14, 8, 4, 7, 8, 6, 5, 4, 10]
    

    Results

    %timeit original(long_data, 10)
    7 s ± 49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    %timeit binary_search(long_data, 10)
    82.2 ms ± 176 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
    
    %timeit numba_f(long_data, 10)
    433 µs ± 5.74 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
    

    It turns out that simply using numba can give you really strong gains (over four orders of magnitude in this case). The magic of having things nearby in cache and not jumping around a lot in memory for different function calls and pointer dereferences.

    Actually, as the complexity in big O terms is always going to be dominated by computing the cumsum, I can't imagine a scenario, where the binary search would win out. It is also unlikely that you could get anything significantly better than the numba result.