In a pandas.DataFrame
, I have a column df['d_max']
. I want to compare 12 values at a time and if all 12 values are the same, then I need to flag it in the next column. currently, I have written the following code, and it's working fine. I wonder if there is any other better/more efficient way of doing the same. Please note, I am comparing the 12th value with previous 11 values in the column.
# Check if the maximum value is repeated 12 times in succession
df['is_peak'] = ((df['d_max'].shift(-1) == df['d_max']) &
(df['d_max'].shift(-2) == df['d_max']) &
(df['d_max'].shift(-3) == df['d_max']) &
(df['d_max'].shift(-4) == df['d_max']) &
(df['d_max'].shift(-5) == df['d_max']) &
(df['d_max'].shift(-6) == df['d_max']) &
(df['d_max'].shift(-7) == df['d_max']) &
(df['d_max'].shift(-8) == df['d_max']) &
(df['d_max'].shift(-9) == df['d_max']) &
(df['d_max'].shift(-10) == df['d_max']) &
(df['d_max'].shift(-11) == df['d_max']))
You can use pandas.Series.rolling
to solve this, together with a window function. Window functions are available since pandas
version 1.3.0 and are very well suited for looking at cumulations (e.g. cumsum,cumproduct,all-time-mean...) or for sliding window aggregations (mean of last N
values).
df['is_peak'] = df['d_max'].rolling(12, closed='left').apply(lambda x:len(set(x))==1)
We need to supply closed='left'
to make it exclude the current value of the window as it's being dragged over your data.
For example:
import pandas as pd
import numpy as np
data = {'d_max': [2, 3] + [6] * 12 + [1, 5, 5, 5]}
df = pd.DataFrame(data)
df['is_peak'] = df['d_max'][::-1].rolling(12, closed='left').apply(lambda x:len(set(x))==1)[::-1]
df
d_max is_peak
0 2 0.0
1 3 1.0
2 6 0.0
3 6 0.0
4 6 0.0
5 6 0.0
6 6 NaN
7 6 NaN
8 6 NaN
9 6 NaN
10 6 NaN
11 6 NaN
12 6 NaN
13 6 NaN
14 1 NaN
15 5 NaN
16 5 NaN
17 5 NaN
There is a performance difference between different approaches. Providing a custom lambda function in apply
is incredibly slow compared to other approaches so we need to get creative to get something more performant than your original code.
If all values in a Series
are equal, its variance is 0. We can apply this by using Rolling.var()
:
df["d_max"].rolling(12, closed="left").var() == 0
import numpy as np
import pandas as pd
from performance_measurement import run_performance_comparison
def shift_approach(df):
df["is_peak"] = (
(df["d_max"].shift(1) == df["d_max"])
& (df["d_max"].shift(2) == df["d_max"])
& (df["d_max"].shift(3) == df["d_max"])
& (df["d_max"].shift(4) == df["d_max"])
& (df["d_max"].shift(5) == df["d_max"])
& (df["d_max"].shift(6) == df["d_max"])
& (df["d_max"].shift(7) == df["d_max"])
& (df["d_max"].shift(8) == df["d_max"])
& (df["d_max"].shift(9) == df["d_max"])
& (df["d_max"].shift(10) == df["d_max"])
& (df["d_max"].shift(11) == df["d_max"])
)
return df
def rolling_approach_var(df):
df["is_peak"] = (
df["d_max"].rolling(12, closed="left").var()==0
)
return df
def mozway_rolling_approach(df):
df["is_peak"] = (
df.rolling(12)["d_max"].apply(lambda x: x.iloc[:-1].eq(x.iloc[-1]).all()).eq(1)
)
return df
from numpy.lib.stride_tricks import sliding_window_view as swv
def mozway_striding_approach(df):
a = swv(df["d_max"], 12)
df.loc[df.index[-len(a) :], "is_peak"] = (a[:, [0]] == a[:, 1:]).all(axis=1)
return df
approaches = [
rolling_approach_var,
shift_approach,
mozway_striding_approach,
]
def generate_data(dataset_size):
"""Generates some random data of size dataset size, while making sure some runs of 12 equal numbers are included"""
data = {"d_max": np.random.randint(1, 10, dataset_size)}
df = pd.DataFrame(data)
insert_indices = np.random.choice(
range(dataset_size - 11), size=dataset_size // 12, replace=False
)
for idx in insert_indices:
df.iloc[idx : idx + 12] = df.iloc[idx]
return [df]
def generate_same_data(dataset_size):
"""Generates a dataframe where the series always has the same value"""
data = {"d_max": [12] * dataset_size}
df = pd.DataFrame(data)
return [df]
run_performance_comparison(
approaches,
[1000, 3000, 5000, 10000, 30000, 50000, 100000, 200000, 300000,500000,1000000],
setup=generate_same_data,
title="Same data",
number_of_repetitions=1,
)
Profiling code:
import timeit
import matplotlib.pyplot as plt
from typing import List, Dict, Callable
from contextlib import contextmanager
@contextmanager
def data_provider(data_size, setup=lambda N: N, teardown=lambda: None):
data = setup(data_size)
yield data
teardown()
def run_performance_comparison(approaches: List[Callable],
data_size: List[int],
setup=lambda N: N,
teardown=lambda: None,
number_of_repetitions=5, title='Performance Comparison',data_name='N'):
approach_times: Dict[Callable, List[float]] = {approach: [] for approach in approaches}
for N in data_size:
with data_provider(N, setup, teardown) as data:
for approach in approaches:
approach_time = timeit.timeit(lambda: approach(*data), number=number_of_repetitions)
approach_times[approach].append(approach_time)
for approach in approaches:
plt.plot(data_size, approach_times[approach], label=approach.__name__)
plt.yscale('log')
plt.xscale('log')
plt.xlabel(data_name)
plt.ylabel('Execution Time (seconds)')
plt.title(title)
plt.legend()
plt.show()