I’m working on a dataset with Polars (Python), and I’m stumped on a “rolling” grouping operation. The data looks like this:
updated_at | last_trade_ts | ask_price | ask_qty | bid_price | bid_qty |
---|---|---|---|---|---|
2023-12-20 15:30:54 CET | 2023-12-20 15:30:42 CET | 114.2 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:31:38 CET | 2023-12-20 15:30:42 CET | 112.0 | 15.2 | 109.49 | 0.1 |
2023-12-20 15:31:44 CET | 2023-12-20 15:30:42 CET | 112.0 | 13.2 | 109.49 | 0.1 |
2023-12-20 15:31:58 CET | 2023-12-20 15:31:56 CET | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:14 CET | 2023-12-20 15:31:56 CET | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:27 CET | 2023-12-20 15:31:56 CET | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:36 CET | 2023-12-20 15:31:56 CET | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:47 CET | 2023-12-20 15:31:56 CET | 112.0 | 1.2 | 109.49 | 0.1 |
What I want to do is aggregate data in 5-minute windows with starting times set by the updated_at
column, but only when there are new values in the last_trade_ts
column.
For example, in the fragment above, I want to have
updated_at BETWEEN '2023-12-20 15:30:54' AND '2023-12-20 15:35:54'
for the first group, and
updated_at BETWEEN '2023-12-20 15:31:58' AND '2023-12-20 15:36:58'
for the second.
Of course, one possible solution would be to apply the .rolling()
method on updated_at
and then select only the first row from a subsequent grouping by last_trade_ts
. However, the dataset is quite large, and this approach involves a lot of unnecessary calculations.
Is there a way to perform something similar to what .rolling()
does, but with a subset of starting indices?
P.S. I’m open to solutions using other tools, if needed. If someone has a solution in R, I can also consider migrating to that.
I cannot come up with solution which would use .group_by_dynamic()
cause you can only use single column for both every
and period
parameters.
One possible way of doing it would be more classical pre-window functions sql-like style, with DataFrame.join()
.
So, first we create a separate DataFrame which gives us list of start moments of desired groups. To do that, we would use Expr.rle_id()
, the index column which would increase by 1 every time last_trade_ts
changes:
(
df
.with_columns(pl.col("last_trade_ts").rle_id().alias('i'))
)
┌─────────────────────┬─────────────────────┬───────────┬─────┐
│ updated_at ┆ last_trade_ts ┆ ask_price ┆ i │
│ --- ┆ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] ┆ f64 ┆ u32 │
╞═════════════════════╪═════════════════════╪═══════════╪═════╡
│ 2023-12-20 15:30:54 ┆ 2023-12-20 15:30:42 ┆ 114.2 ┆ 0 │
│ 2023-12-20 15:31:38 ┆ 2023-12-20 15:30:42 ┆ 112.0 ┆ 0 │
│ 2023-12-20 15:31:44 ┆ 2023-12-20 15:30:42 ┆ 112.0 ┆ 0 │
│ 2023-12-20 15:31:58 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:14 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:27 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:36 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:47 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
└─────────────────────┴─────────────────────┴───────────┴─────┘
now, we don't need this column, we only want to use it as a grouping index:
df_groups = (
df
.group_by(pl.col("last_trade_ts").rle_id())
.agg(pl.col("updated_at").first())
.drop("last_trade_ts")
)
┌─────────────────────┐
│ updated_at │
│ --- │
│ datetime[μs] │
╞═════════════════════╡
│ 2023-12-20 15:31:58 │
│ 2023-12-20 15:30:54 │
└─────────────────────┘
Alternatively, you can use DataFrame.unique()
:
df_groups = (
df
.unique("last_trade_ts",keep="first")
.select("updated_at")
)
┌─────────────────────┐
│ updated_at │
│ --- │
│ datetime[μs] │
╞═════════════════════╡
│ 2023-12-20 15:30:54 │
│ 2023-12-20 15:31:58 │
└─────────────────────┘
Now what we want to do is to join df_groups
to df
on condition df.updated_at between df_groups.updated_at and df_groups.updated_at + 5 minutes
.
Unfortunately, polars is not great on inequality joins, so you can either use cross
join and .filter()
afterwards:
(
df_groups
.join(df, how="cross")
.filter(
pl.col("updated_at_right") >= pl.col("updated_at"),
pl.col("updated_at_right") <= pl.col("updated_at") + pl.duration(minutes=5)
)
.group_by("updated_at")
.agg(pl.col("updated_at_right").max())
)
┌─────────────────────┬─────────────────────┐
│ updated_at ┆ updated_at_right │
│ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] │
╞═════════════════════╪═════════════════════╡
│ 2023-12-20 15:31:58 ┆ 2023-12-20 15:33:47 │
│ 2023-12-20 15:30:54 ┆ 2023-12-20 15:33:47 │
└─────────────────────┴─────────────────────┘
Alternatively, you can also use duckdb
and use sql to get the results:
import duckdb
duckdb.sql("""
select
df_groups.updated_at as start,
max(df.updated_at) as end
from df_groups
inner join df on
df.updated_at >= df_groups.updated_at and
df.updated_at <= df_groups.updated_at + interval 5 minutes
group by
df_groups.updated_at
""").pl()
┌─────────────────────┬─────────────────────┐
│ start ┆ end │
│ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] │
╞═════════════════════╪═════════════════════╡
│ 2023-12-20 15:31:58 ┆ 2023-12-20 15:33:47 │
│ 2023-12-20 15:30:54 ┆ 2023-12-20 15:33:47 │
└─────────────────────┴─────────────────────┘