Search code examples
pythonpandasdataframewindow-functions

how to compare 12 consecutive rows in a dataframe


In a pandas.DataFrame, I have a column df['d_max']. I want to compare 12 values at a time and if all 12 values are the same, then I need to flag it in the next column. currently, I have written the following code, and it's working fine. I wonder if there is any other better/more efficient way of doing the same. Please note, I am comparing the 12th value with previous 11 values in the column.

 # Check if the maximum value is repeated 12 times in succession  

 df['is_peak'] = ((df['d_max'].shift(-1) == df['d_max']) &
                     (df['d_max'].shift(-2) == df['d_max']) &
                     (df['d_max'].shift(-3) == df['d_max']) &
                     (df['d_max'].shift(-4) == df['d_max']) &
                     (df['d_max'].shift(-5) == df['d_max']) &
                     (df['d_max'].shift(-6) == df['d_max']) &
                     (df['d_max'].shift(-7) == df['d_max']) &
                     (df['d_max'].shift(-8) == df['d_max']) &
                     (df['d_max'].shift(-9) == df['d_max']) &
                     (df['d_max'].shift(-10) == df['d_max']) &
                     (df['d_max'].shift(-11) == df['d_max']))

Solution

  • You can use pandas.Series.rolling to solve this, together with a window function. Window functions are available since pandas version 1.3.0 and are very well suited for looking at cumulations (e.g. cumsum,cumproduct,all-time-mean...) or for sliding window aggregations (mean of last N values).

    df['is_peak'] = df['d_max'].rolling(12, closed='left').apply(lambda x:len(set(x))==1)
    

    We need to supply closed='left' to make it exclude the current value of the window as it's being dragged over your data.

    For example:

    import pandas as pd
    import numpy as np
    data = {'d_max': [2, 3] + [6] * 12 + [1, 5, 5, 5]}
    df = pd.DataFrame(data)
    df['is_peak'] = df['d_max'][::-1].rolling(12, closed='left').apply(lambda x:len(set(x))==1)[::-1]
    df
    
        d_max   is_peak
    0   2   0.0
    1   3   1.0
    2   6   0.0
    3   6   0.0
    4   6   0.0
    5   6   0.0
    6   6   NaN
    7   6   NaN
    8   6   NaN
    9   6   NaN
    10  6   NaN
    11  6   NaN
    12  6   NaN
    13  6   NaN
    14  1   NaN
    15  5   NaN
    16  5   NaN
    17  5   NaN
    

    There is a performance difference between different approaches. Providing a custom lambda function in apply is incredibly slow compared to other approaches so we need to get creative to get something more performant than your original code.

    If all values in a Series are equal, its variance is 0. We can apply this by using Rolling.var():

    df["d_max"].rolling(12, closed="left").var() == 0
    

    enter image description here

    import numpy as np
    import pandas as pd
    
    from performance_measurement import run_performance_comparison
    
    
    def shift_approach(df):
        df["is_peak"] = (
            (df["d_max"].shift(1) == df["d_max"])
            & (df["d_max"].shift(2) == df["d_max"])
            & (df["d_max"].shift(3) == df["d_max"])
            & (df["d_max"].shift(4) == df["d_max"])
            & (df["d_max"].shift(5) == df["d_max"])
            & (df["d_max"].shift(6) == df["d_max"])
            & (df["d_max"].shift(7) == df["d_max"])
            & (df["d_max"].shift(8) == df["d_max"])
            & (df["d_max"].shift(9) == df["d_max"])
            & (df["d_max"].shift(10) == df["d_max"])
            & (df["d_max"].shift(11) == df["d_max"])
        )
        return df
    
    
    
    
    def rolling_approach_var(df):
        df["is_peak"] = (
            df["d_max"].rolling(12, closed="left").var()==0
        )
        return df
    
    
    def mozway_rolling_approach(df):
        df["is_peak"] = (
            df.rolling(12)["d_max"].apply(lambda x: x.iloc[:-1].eq(x.iloc[-1]).all()).eq(1)
        )
        return df
    
    
    from numpy.lib.stride_tricks import sliding_window_view as swv
    
    
    def mozway_striding_approach(df):
        a = swv(df["d_max"], 12)
        df.loc[df.index[-len(a) :], "is_peak"] = (a[:, [0]] == a[:, 1:]).all(axis=1)
        return df
    
    
    approaches = [
        rolling_approach_var,
        shift_approach,
        mozway_striding_approach,
    ]
    
    
    def generate_data(dataset_size):
        """Generates some random data of size dataset size, while making sure some runs of 12 equal numbers are included"""
        data = {"d_max": np.random.randint(1, 10, dataset_size)}
        df = pd.DataFrame(data)
        insert_indices = np.random.choice(
            range(dataset_size - 11), size=dataset_size // 12, replace=False
        )
        for idx in insert_indices:
            df.iloc[idx : idx + 12] = df.iloc[idx]
    
        return [df]
    
    
    def generate_same_data(dataset_size):
        """Generates a dataframe where the series always has the same value"""
        data = {"d_max": [12] * dataset_size}
        df = pd.DataFrame(data)
    
        return [df]
    
    
    run_performance_comparison(
        approaches,
        [1000, 3000, 5000, 10000, 30000, 50000, 100000, 200000, 300000,500000,1000000],
        setup=generate_same_data,
        title="Same data",
        number_of_repetitions=1,
    )
    

    Profiling code:

    import timeit
    import matplotlib.pyplot as plt
    from typing import List, Dict, Callable
    
    from contextlib import contextmanager
    
    
    @contextmanager
    def data_provider(data_size, setup=lambda N: N, teardown=lambda: None):
        data = setup(data_size)
        yield data
        teardown()
    
    
    def run_performance_comparison(approaches: List[Callable],
                                   data_size: List[int],
                                   setup=lambda N: N,
                                   teardown=lambda: None,
                                   number_of_repetitions=5, title='Performance Comparison',data_name='N'):
        approach_times: Dict[Callable, List[float]] = {approach: [] for approach in approaches}
    
        for N in data_size:
            with data_provider(N, setup, teardown) as data:
                for approach in approaches:
                    approach_time = timeit.timeit(lambda: approach(*data), number=number_of_repetitions)
                    approach_times[approach].append(approach_time)
    
        for approach in approaches:
            plt.plot(data_size, approach_times[approach], label=approach.__name__)
        plt.yscale('log')
        plt.xscale('log')
        plt.xlabel(data_name)
        plt.ylabel('Execution Time (seconds)')
        plt.title(title)
        plt.legend()
        plt.show()