Search code examples
pythonpandasnumpytime-seriespandas-resample

Efficient Resampling time series


I have been working on time-series data resampling using Pandas. It works well and give required results. However, the performance is little slow as per my current requirement.

Problem: I have minute data that I need to resample to many frequencies such as 5min, 15min, 3H. Pandas resampling works fine when the dataset is small but if I want to resample large number of records (10 days data of 1000 symbols) it's performance decreases significantly.

Tried:

  1. I have tried to implement resampling in numpy arrays but it is even slower (see below). (Probably due to how I implemented it).
  2. Use apply with resample method in pandas and use a custom function in cython (new for me btw) for aggregation. Performance was below when resample and agg are used.
  3. Similarly to #2, experimented with numba.jit, but no improvement.
  4. Split data and used multiprocessing for sampling. It improves the performance by ~50% but overhead and compute requirements increase significantly.

Here is the sample code I used for checking:

Observations:

  1. When I remove sum in agg for pandas, the performance improves a little (~15%).
  2. Numpy is way slower than expected
from datetime import datetime
from time import time

import numpy as np
import pandas as pd

symbols = 1000
start = datetime(2023, 1, 1)
end = datetime(2023, 1, 2)
data_cols = ['open', 'high', 'low', 'close', 'volume']
agg_props = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
base, sample = '1min', '5min'


def pandas_resample(df: pd.DataFrame):
    df.sort_values(['sid', 'timestamp'], inplace=True)
    df = df.set_index('timestamp')
    re_df = df.groupby('sid').resample(sample, label='left', closed='left').agg(agg_props).reset_index()
    return re_df


def numpy_resample(arr):
    intervals = pd.date_range(arr[:, 1].min(), arr[:, 1].max(), freq=sample)
    intervals = list(zip(intervals[:-1], intervals[1:]))
    # chunk_dates(data_df.index.min(), data_df.index.max(), interval=self.freq, as_range=True)
    data = []
    groups = np.unique(arr[:, 0])
    for _group in groups:
        group_data = arr[arr[:, 0] == _group, :]
        for _start, _end in intervals:
            # print(_start)
            _data_filter = (_start <= group_data[:, 1]) & (group_data[:, 1] < _end)
            _interval_data = group_data[_data_filter]
            _interval_agg = [_group, _start]
            _skip = len(_interval_data) == 0
            for _val, _key in [['open', 2], ['high', 3], ['low', 4], ['close', 5], ['volume', 6]]:
                # print(_key)
                _col_data = _interval_data[:, _key]
                if not _skip:
                    if _val in ['open']: _key_val = _col_data[0]
                    if _val in ['high']: _key_val = _col_data.max()
                    if _val in ['low']: _key_val = _col_data.min()
                    if _val in ['close']: _key_val = _col_data[-1]
                    if _val in ['volume']: _key_val = _col_data.sum()
                else:
                    _key_val = None
                _interval_agg.append(_key_val)
            data.append(_interval_agg)
    return data


if __name__ == '__main__':
    timestamps = pd.date_range(start, end, freq=base)
    candles = pd.DataFrame({'timestamp': pd.DatetimeIndex(timestamps), **{_col: np.random.randint(50, 150, len(timestamps)) for _col in data_cols}})
    symbol_id = pd.DataFrame({'sid': np.random.randint(1000, 2000, symbols)})
    candles['id'] = 1
    symbol_id['id'] = 1
    data_df = symbol_id.merge(candles, on='id').drop(columns=['id'])
    print(len(data_df), "\n", data_df.head(3))

    st1 = time()
    resampled_df = pandas_resample(data_df.copy())
    print('pandas', time() - st1)

    st2 = time()
    numpy_resample(data_df.values)
    print('numpy', time() - st2)

Output

pandas 3.5319528579711914
numpy 93.10612797737122

Please suggest if there is any other approach or implementation which could result in better performance.

Thanks in advance !!


Solution

  • I think that by using a Grouper object that you use inside a groupby operation on your DataFrame, you can have a significant speedup. You can do a resample using that method at the same time you do a groupby.

    Here's a comparison code:

    from datetime import datetime
    from time import time
    
    import numpy as np
    import pandas as pd
    
    symbols = 1000
    start = datetime(2023, 1, 1)
    end = datetime(2023, 1, 2)
    data_cols = ['open', 'high', 'low', 'close', 'volume']
    agg_props = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
    base, sample = '1min', '5min'
    grouper = ['sid', pd.Grouper(key='timestamp', freq=sample)]
    
    def pandas_resample(df: pd.DataFrame) -> pd.DataFrame:
        df.sort_values(['sid', 'timestamp'], inplace=True)
        df = df.set_index('timestamp')
        re_df = df.groupby('sid').resample(sample, label='left', closed='left').agg(agg_props).reset_index()
        return re_df
    
    def pandas_resample2(df: pd.DataFrame) -> pd.DataFrame:
        re_df = df.groupby(grouper, as_index=False).agg(agg_props)
        return re_df
    
    if __name__ == '__main__':
        timestamps = pd.DataFrame({'timestamp': pd.date_range(start, end, freq=base)})
        symbol_id = pd.DataFrame({'sid': np.random.randint(1000, 2000, symbols)})
        indexes = pd.merge(timestamps, symbol_id, how='cross')
        values = pd.DataFrame(np.random.randint(50, 150, size=(len(indexes), len(data_cols))), columns=data_cols)
        data_df = pd.concat([indexes, values], axis=1)
        print(len(data_df), "\n", data_df)
    
        data_df_resampled = data_df.copy()
        st1 = time()
        resampled_df1 = pandas_resample(data_df_resampled)
        duration_1 = time() - st1
        print('pandas 1', duration_1)
        #print(resampled_df1)
    
        data_df_resampled = data_df.copy()
        st2 = time()
        resampled_df2 = pandas_resample2(data_df_resampled)
        duration_2 = time() - st2
        print('pandas 2', duration_2)
        #print(resampled_df2)
    
        print('Are both methods equivalent?', resampled_df1.equals(resampled_df2))
        print(f'Method suggested is {duration_1/duration_2:0.2f} time faster')
    
    

    I get around a 25 times speedup. Here is an example of the output I get on my computer:

    1441000 
              timestamp   sid  open  high  low  close  volume
    0       2023-01-01  1259    58   107  122     56     144
    1       2023-01-01  1722    84   125  144     85     125
    2       2023-01-01  1959    84    90  145     83     144
    3       2023-01-01  1650   136   113   61     63      83
    4       2023-01-01  1556    68   124   78    112     123
    ...            ...   ...   ...   ...  ...    ...     ...
    1440995 2023-01-02  1406    87    82  121    103     123
    1440996 2023-01-02  1597   110    67   70    113     120
    1440997 2023-01-02  1024   131    88  149     70     136
    1440998 2023-01-02  1288   136   146   72     84     129
    1440999 2023-01-02  1678   118    83  134     99     148
    
    [1441000 rows x 7 columns]
    pandas 1 3.4833083152770996
    pandas 2 0.13672590255737305
    Are both methods equivalent? True
    Method suggested is 25.48 time faster
    

    What do you think?

    By the way, you don't need to sort your values beforehand if you intend to do a groupby operation.