Search code examples
pythonpython-polarsrolling-computation

Rolling KPI calculations in polars, index not visible


How to add rolling KPI's to original dataframe in polars? when I do group by, I am not seeing an index and so cant join? I want to keep all original columns in dataframe intact but add rolling kpi to the dataframe?

Pandas code:

groups_df = df[mask_for_filter].groupby(['group_identifier'])
rolling_kpi = groups_df[['col_1', 'col_2']].rolling(15, min_periods=1, center=True).median().reset_index(level='group_identifier').sort_index()

df.loc[mask_for_filter, 'col_1_median'] = rolling_kpi['col_1']
df.loc[mask_for_filter, 'col_2_median'] = rolling_kpi['col_2']

Polars:

df = df.filter(mask_for_filter).group_by('group_identifier').agg(
    col_1_median=pl.col('col_1').rolling_median(15, min_periods=1, center=True),
    col_2_median=pl.col('col_2').rolling_median(15, min_periods=1, center=True))

Code: result_df should be same as df, except that with extra rolling median columns which is not happening in above....plus there is no index so can't merge/join

import polars as pl
import numpy as np

np.random.seed(0)
data = {
    'group_identifier': np.random.choice(['A', 'B', 'C'], 100),
    'col_1': np.random.randn(100).round(2),
    'col_2': np.random.randn(100).round(2),
    'other_col': np.random.randn(100).round(2)
}

df = pl.DataFrame(data)

mask_for_filter = df['col_1'] > 0

result_df = df.filter(mask_for_filter).group_by('group_identifier').agg(
    col_1_median=pl.col('col_1').rolling_median(15, min_periods=1, center=True),
    col_2_median=pl.col('col_2').rolling_median(15, min_periods=1, center=True)
)

Solution

  • It looks like you don't need to group by, but to run rolling_median() over window instead.

    • over() to limit calculation to be within group_identifier.
    • name.suffix() to assign names to the new columns.

    If you only need filtered rows:

    (
        df
        .filter(mask_for_filter)
        .with_columns(
            pl.col("col_1", "col_2")
            .rolling_median(15, min_periods=1, center=True)
            .over("group_identifier")
            .name.suffix("_median")
        )
    )
    
    ┌──────────────────┬───────┬───────┬───────────┬──────────────┬──────────────┐
    │ group_identifier ┆ col_1 ┆ col_2 ┆ other_col ┆ col_1_median ┆ col_2_median │
    │ ---              ┆ ---   ┆ ---   ┆ ---       ┆ ---          ┆ ---          │
    │ str              ┆ f64   ┆ f64   ┆ f64       ┆ f64          ┆ f64          │
    ╞══════════════════╪═══════╪═══════╪═══════════╪══════════════╪══════════════╡
    │ B                ┆ 0.01  ┆ 1.68  ┆ 1.12      ┆ 0.83         ┆ -0.46        │
    │ B                ┆ 0.37  ┆ -0.26 ┆ 0.04      ┆ 0.85         ┆ -0.66        │
    │ A                ┆ 0.72  ┆ -0.38 ┆ 0.47      ┆ 0.93         ┆ -0.44        │
    │ A                ┆ 0.36  ┆ -0.51 ┆ -0.4      ┆ 0.86         ┆ -0.5         │
    ...
    └──────────────────┴───────┴───────┴───────────┴──────────────┴──────────────┘
    

    Or, if you need this in your original DataFrame

    • when/then() twice - top one to only assign rolling median to rows which has col_1 > 0 and second one to not include rows to be filtered into the calculation of rolling median.
    (
        df
        .with_columns(
            pl.when(pl.col("col_1") > 0).then(
                pl.when(pl.col("col_1") > 0).then(pl.col("col_1", "col_2"))
                .rolling_median(15, min_periods=1, center=True)
                .over("group_identifier")
            )
            .name.suffix("_median")
        )
    )
    

    If you want to add more aggregates you could generalize it (although I'm not sure if it's readable enough to go to production):

    (
        df
        .with_columns(
            pl.when(pl.col("col_1") > 0).then(
                transform(
                    pl.when(pl.col("col_1") > 0).then(pl.col("col_1", "col_2")),
                    15, min_periods=1, center=True
                )
                .over("group_identifier")
            )
            .name.suffix(suffix)
            for transform, suffix in [
                (pl.Expr.rolling_median, "_median"),
                (pl.Expr.rolling_mean, "_mean"),
            ]
        )
    )