Search code examples
python-polars

Nested time-based groupby operations/sub-groups without map_groups()?


I'm wanting to understand the polars way to create temporal sub-groups out of the groups from a rolling() operation.

I'm looking to do this keeping things parallel i.e. without using map_groups() (see that approach) and without using secondary/merging dataframes.

Example input:

┌─────┬─────────────────────┬───────┐
│ row ┆ date                ┆ price │
│ --- ┆ ---                 ┆ ---   │
│ i64 ┆ datetime[μs]        ┆ i64   │
╞═════╪═════════════════════╪═══════╡
│ 1   ┆ 2022-01-01 10:00:00 ┆ 10    │
│ 2   ┆ 2022-01-01 10:05:00 ┆ 20    │
│ 3   ┆ 2022-01-01 10:10:00 ┆ 30    │
│ 4   ┆ 2022-01-01 10:15:00 ┆ 40    │
│ 5   ┆ 2022-01-01 10:20:00 ┆ 50    │
│ 6   ┆ 2022-01-01 10:25:00 ┆ 60    │
│ 7   ┆ 2022-01-01 10:30:00 ┆ 70    │
│ 8   ┆ 2022-01-01 10:35:00 ┆ 80    │
│ 8   ┆ 2022-01-01 10:40:00 ┆ 90    │
│ 9   ┆ 2022-01-01 10:45:00 ┆ 100   │
│ 10  ┆ 2022-01-01 10:50:00 ┆ 110   │
│ 11  ┆ 2022-01-01 10:55:00 ┆ 120   │
│ 12  ┆ 2022-01-01 11:00:00 ┆ 130   │
└─────┴─────────────────────┴───────┘

Desired output:

┌─────┬─────────────────────┬───────┬──────────────────────────────────┐
│ row ┆ date                ┆ price ┆ 10_min_groups_mean_price_history │
│ --- ┆ ---                 ┆ ---   ┆ ---                              │
│ i64 ┆ datetime[μs]        ┆ i64   ┆ list[i64]                        │
╞═════╪═════════════════════╪═══════╪══════════════════════════════════╡
│ 1   ┆ 2022-01-01 10:00:00 ┆ 10    ┆ [10]                             │
│ 2   ┆ 2022-01-01 10:05:00 ┆ 20    ┆ [15]                             │
│ 3   ┆ 2022-01-01 10:10:00 ┆ 30    ┆ [25, 10]                         │
│ 4   ┆ 2022-01-01 10:15:00 ┆ 40    ┆ [35, 15]                         │
│ 5   ┆ 2022-01-01 10:20:00 ┆ 50    ┆ [45, 25, 10]                     │
│ 6   ┆ 2022-01-01 10:25:00 ┆ 60    ┆ [55, 35, 15]                     │
│ 7   ┆ 2022-01-01 10:30:00 ┆ 70    ┆ [65, 45, 25]                     │
│ 8   ┆ 2022-01-01 10:35:00 ┆ 80    ┆ [75, 55, 35]                     │
│ 8   ┆ 2022-01-01 10:40:00 ┆ 90    ┆ [85, 65, 45]                     │
│ 9   ┆ 2022-01-01 10:45:00 ┆ 100   ┆ [95, 75, 55]                     │
│ 10  ┆ 2022-01-01 10:50:00 ┆ 110   ┆ [105, 85, 65]                    │
│ 11  ┆ 2022-01-01 10:55:00 ┆ 120   ┆ [115, 95, 75]                    │
│ 12  ┆ 2022-01-01 11:00:00 ┆ 130   ┆ [125, 105, 85]                   │
└─────┴─────────────────────┴───────┴──────────────────────────────────┘

What is happening above?

  • A rolling window is applied over the dataframe producing a window per row.
  • Each window includes all rows within the last 30min (including the current row).
  • Then, each 30min window is devided into 10min sub-groups.
  • The mean price is calculated for each 10min sub-group
  • All mean prices from the sub-groups are returned as a list (most recent first) to the "10_min_groups_mean_price_history " column

Worked example (using row 5 as an example):

  • The rolling window for row 5 captures the previous 30min of data, which is rows 1 to 5
  • These rows are sub-grouped into 10min windows creating three sub-groups that capture rows [[5,4],[3,2],[1]]
  • The mean price of the rows in each sub-group is calculated and produced as a list → [45, 25, 10]

Mental model:

I'm conceptualising this as treating each window from a rolling() operation as a dataframe that can be computed as needed (in this case by performing a group_by_dynamic() operation on it, with the intent of returning aggregations on those sub-groups as a list), but not sure if that is the right way to think about it???

If the sub-group data was categorical it would be a simple case of using over() however I'm not aware of an equivalent when the requirement is to sub-group by time series?

I am also under the impression that this operation should be parallelisable as each window is independent from each other (its just more calc steps), but please point out if there's a reason it can't be.

Thanks in advance!

Full dummy data set:

If you want to run this with a realistic sized dataset you can use

import polars as pl
from datetime import datetime, timedelta

df_dummy = pl.DataFrame({
    'date' : pl.datetime_range(
        datetime(2000, 1, 1, 9),
        datetime(2000, 1, 1, 16, 59, 59),
        timedelta(seconds=1),
        eager=True
    )
})
df_dummy = df_dummy.with_columns(
    pl.Series(np.random.uniform(.5,.95,len(df_dummy)) * 100 ).alias('price')
)

Other ways that people might ask this question (for others searching):

  • group_by_dynamic() within rolling()
  • How to access polars RollingGroupBy[Dataframe] Object
  • Treat each rolling() window as a dataframe to aggregate on
  • Nested dataframes within groupby context
  • Nested groupby contexts

Solution

  • Could you .explode() the .rolling()

    And then use the resulting column for your .group_by_dynamic()?

    (df.rolling(index_column="date", period="30m", closed="both")
       .agg(pl.col("date").alias("window"))
       .explode("window"))
    
    shape: (70, 2)
    ┌─────────────────────┬─────────────────────┐
    │ date                | window              │
    │ ---                 | ---                 │
    │ datetime[μs]        | datetime[μs]        │
    ╞═════════════════════╪═════════════════════╡
    │ 2022-01-01 10:00:00 | 2022-01-01 10:00:00 │
    │ 2022-01-01 10:05:00 | 2022-01-01 10:00:00 │
    │ 2022-01-01 10:05:00 | 2022-01-01 10:05:00 │
    │ 2022-01-01 10:10:00 | 2022-01-01 10:00:00 │
    │ 2022-01-01 10:10:00 | 2022-01-01 10:05:00 │
    │ 2022-01-01 10:10:00 | 2022-01-01 10:10:00 │
    │ 2022-01-01 10:15:00 | 2022-01-01 10:00:00 │
    │ 2022-01-01 10:15:00 | 2022-01-01 10:05:00 │
    │ 2022-01-01 10:15:00 | 2022-01-01 10:10:00 │
    │ 2022-01-01 10:15:00 | 2022-01-01 10:15:00 │
    │ ...                 | ...                 │
    │ 2022-01-01 10:55:00 | 2022-01-01 10:45:00 │
    │ 2022-01-01 10:55:00 | 2022-01-01 10:50:00 │
    │ 2022-01-01 10:55:00 | 2022-01-01 10:55:00 │
    │ 2022-01-01 11:00:00 | 2022-01-01 10:30:00 │
    │ 2022-01-01 11:00:00 | 2022-01-01 10:35:00 │
    │ 2022-01-01 11:00:00 | 2022-01-01 10:40:00 │
    │ 2022-01-01 11:00:00 | 2022-01-01 10:45:00 │
    │ 2022-01-01 11:00:00 | 2022-01-01 10:50:00 │
    │ 2022-01-01 11:00:00 | 2022-01-01 10:55:00 │
    │ 2022-01-01 11:00:00 | 2022-01-01 11:00:00 │
    └─────────────────────┴─────────────────────┘
    

    Something along the lines of:

    (df.rolling(index_column="date", period="30m", closed="both")
       .agg(pl.col("date").alias("window"), pl.col("price"))
       .explode("window", "price")
       .group_by_dynamic(by="date", index_column="window", every="10m", closed="right")
       .agg(pl.col("price")) # pl.col("price").mean()
       .group_by("date", maintain_order=True)
       .all()
    )
    
    shape: (13, 3)
    ┌─────────────────────┬─────────────────────────────────────┬──────────────────────────────────┐
    │ date                | window                              | price                            │
    │ ---                 | ---                                 | ---                              │
    │ datetime[μs]        | list[datetime[μs]]                  | list[list[i64]]                  │
    ╞═════════════════════╪═════════════════════════════════════╪══════════════════════════════════╡
    │ 2022-01-01 10:00:00 | [2022-01-01 09:50:00]               | [[10]]                           │
    │ 2022-01-01 10:05:00 | [2022-01-01 09:50:00, 2022-01-01... | [[10], [20]]                     │
    │ 2022-01-01 10:10:00 | [2022-01-01 09:50:00, 2022-01-01... | [[10], [20, 30]]                 │
    │ 2022-01-01 10:15:00 | [2022-01-01 09:50:00, 2022-01-01... | [[10], [20, 30], [40]]           │
    │ 2022-01-01 10:20:00 | [2022-01-01 09:50:00, 2022-01-01... | [[10], [20, 30], [40, 50]]       │
    │ 2022-01-01 10:25:00 | [2022-01-01 09:50:00, 2022-01-01... | [[10], [20, 30], ... [60]]       │
    │ 2022-01-01 10:30:00 | [2022-01-01 09:50:00, 2022-01-01... | [[10], [20, 30], ... [60, 70]]   │
    │ 2022-01-01 10:35:00 | [2022-01-01 10:00:00, 2022-01-01... | [[20, 30], [40, 50], ... [80]]   │
    │ 2022-01-01 10:40:00 | [2022-01-01 10:00:00, 2022-01-01... | [[30], [40, 50], ... [80, 90]]   │
    │ 2022-01-01 10:45:00 | [2022-01-01 10:10:00, 2022-01-01... | [[40, 50], [60, 70], ... [100]]  │
    │ 2022-01-01 10:50:00 | [2022-01-01 10:10:00, 2022-01-01... | [[50], [60, 70], ... [100, 110]] │
    │ 2022-01-01 10:55:00 | [2022-01-01 10:20:00, 2022-01-01... | [[60, 70], [80, 90], ... [120]]  │
    │ 2022-01-01 11:00:00 | [2022-01-01 10:20:00, 2022-01-01... | [[70], [80, 90], ... [120, 130]] │
    └─────────────────────┴─────────────────────────────────────┴──────────────────────────────────┘
    

    Edit: Removed the unneeded .join() as per @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ's help.