Search code examples
pythongroupingpython-polarsrolling-computation

Polars: Rolling groups with starting indices set by a different column


I’m working on a dataset with Polars (Python), and I’m stumped on a “rolling” grouping operation. The data looks like this:

updated_at last_trade_ts ask_price ask_qty bid_price bid_qty
2023-12-20 15:30:54 CET 2023-12-20 15:30:42 CET 114.2 1.2 109.49 0.1
2023-12-20 15:31:38 CET 2023-12-20 15:30:42 CET 112.0 15.2 109.49 0.1
2023-12-20 15:31:44 CET 2023-12-20 15:30:42 CET 112.0 13.2 109.49 0.1
2023-12-20 15:31:58 CET 2023-12-20 15:31:56 CET 112.0 1.2 109.49 0.1
2023-12-20 15:33:14 CET 2023-12-20 15:31:56 CET 112.0 1.2 109.49 0.1
2023-12-20 15:33:27 CET 2023-12-20 15:31:56 CET 112.0 1.2 109.49 0.1
2023-12-20 15:33:36 CET 2023-12-20 15:31:56 CET 112.0 1.2 109.49 0.1
2023-12-20 15:33:47 CET 2023-12-20 15:31:56 CET 112.0 1.2 109.49 0.1

What I want to do is aggregate data in 5-minute windows with starting times set by the updated_at column, but only when there are new values in the last_trade_ts column.

For example, in the fragment above, I want to have

updated_at BETWEEN '2023-12-20 15:30:54' AND '2023-12-20 15:35:54'

for the first group, and

updated_at BETWEEN '2023-12-20 15:31:58' AND '2023-12-20 15:36:58'

for the second.

Of course, one possible solution would be to apply the .rolling() method on updated_at and then select only the first row from a subsequent grouping by last_trade_ts. However, the dataset is quite large, and this approach involves a lot of unnecessary calculations.

Is there a way to perform something similar to what .rolling() does, but with a subset of starting indices?

P.S. I’m open to solutions using other tools, if needed. If someone has a solution in R, I can also consider migrating to that.


Solution

  • I cannot come up with solution which would use .group_by_dynamic() cause you can only use single column for both every and period parameters.

    One possible way of doing it would be more classical pre-window functions sql-like style, with DataFrame.join().

    So, first we create a separate DataFrame which gives us list of start moments of desired groups. To do that, we would use Expr.rle_id(), the index column which would increase by 1 every time last_trade_ts changes:

    (
        df
        .with_columns(pl.col("last_trade_ts").rle_id().alias('i'))
    )
    ┌─────────────────────┬─────────────────────┬───────────┬─────┐
    │ updated_at          ┆ last_trade_ts       ┆ ask_price ┆ i   │
    │ ---                 ┆ ---                 ┆ ---       ┆ --- │
    │ datetime[μs]        ┆ datetime[μs]        ┆ f64       ┆ u32 │
    ╞═════════════════════╪═════════════════════╪═══════════╪═════╡
    │ 2023-12-20 15:30:54 ┆ 2023-12-20 15:30:42 ┆ 114.2     ┆ 0   │
    │ 2023-12-20 15:31:38 ┆ 2023-12-20 15:30:42 ┆ 112.0     ┆ 0   │
    │ 2023-12-20 15:31:44 ┆ 2023-12-20 15:30:42 ┆ 112.0     ┆ 0   │
    │ 2023-12-20 15:31:58 ┆ 2023-12-20 15:31:56 ┆ 112.0     ┆ 1   │
    │ 2023-12-20 15:33:14 ┆ 2023-12-20 15:31:56 ┆ 112.0     ┆ 1   │
    │ 2023-12-20 15:33:27 ┆ 2023-12-20 15:31:56 ┆ 112.0     ┆ 1   │
    │ 2023-12-20 15:33:36 ┆ 2023-12-20 15:31:56 ┆ 112.0     ┆ 1   │
    │ 2023-12-20 15:33:47 ┆ 2023-12-20 15:31:56 ┆ 112.0     ┆ 1   │
    └─────────────────────┴─────────────────────┴───────────┴─────┘
    

    now, we don't need this column, we only want to use it as a grouping index:

    df_groups = (
        df  
        .group_by(pl.col("last_trade_ts").rle_id())
        .agg(pl.col("updated_at").first())
        .drop("last_trade_ts")
    )
    
    ┌─────────────────────┐
    │ updated_at          │
    │ ---                 │
    │ datetime[μs]        │
    ╞═════════════════════╡
    │ 2023-12-20 15:31:58 │
    │ 2023-12-20 15:30:54 │
    └─────────────────────┘
    

    Alternatively, you can use DataFrame.unique():

    df_groups = (
        df
        .unique("last_trade_ts",keep="first")
        .select("updated_at")
    )
    
    ┌─────────────────────┐
    │ updated_at          │
    │ ---                 │
    │ datetime[μs]        │
    ╞═════════════════════╡
    │ 2023-12-20 15:30:54 │
    │ 2023-12-20 15:31:58 │
    └─────────────────────┘
    

    Now what we want to do is to join df_groups to df on condition df.updated_at between df_groups.updated_at and df_groups.updated_at + 5 minutes.

    Unfortunately, polars is not great on inequality joins, so you can either use cross join and .filter() afterwards:

    (    
        df_groups
        .join(df, how="cross")
        .filter(
            pl.col("updated_at_right") >= pl.col("updated_at"),
            pl.col("updated_at_right") <= pl.col("updated_at") + pl.duration(minutes=5)
        )
        .group_by("updated_at")
        .agg(pl.col("updated_at_right").max())
    )
    
    ┌─────────────────────┬─────────────────────┐
    │ updated_at          ┆ updated_at_right    │
    │ ---                 ┆ ---                 │
    │ datetime[μs]        ┆ datetime[μs]        │
    ╞═════════════════════╪═════════════════════╡
    │ 2023-12-20 15:31:58 ┆ 2023-12-20 15:33:47 │
    │ 2023-12-20 15:30:54 ┆ 2023-12-20 15:33:47 │
    └─────────────────────┴─────────────────────┘
    

    Alternatively, you can also use duckdb and use sql to get the results:

    import duckdb
    
    duckdb.sql("""
        select
            df_groups.updated_at as start,
            max(df.updated_at) as end
        from df_groups
            inner join df on
                df.updated_at >= df_groups.updated_at and
                df.updated_at <= df_groups.updated_at + interval 5 minutes
        group by
            df_groups.updated_at
    """).pl()
    
    ┌─────────────────────┬─────────────────────┐
    │ start               ┆ end                 │
    │ ---                 ┆ ---                 │
    │ datetime[μs]        ┆ datetime[μs]        │
    ╞═════════════════════╪═════════════════════╡
    │ 2023-12-20 15:31:58 ┆ 2023-12-20 15:33:47 │
    │ 2023-12-20 15:30:54 ┆ 2023-12-20 15:33:47 │
    └─────────────────────┴─────────────────────┘