I'm trying to write a method for a features pipeline that returns a polars expression. The method should take a column name as a string and an integer number of days. I want to perform a rolling count on that column using a window equal to the number of days.
There doesn't seem to be a rolling_count
expression, so I attempted to use rolling_sum_by
to no avail.
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.lit(1)
.rolling_sum_by(window_size=f"{days}d", by="date_time")
.over(col)
.fill_null(0)
)
I also tried this method, which was closer but still didn't work in all cases
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.col(col)
.cum_count()
.over(col, (pl.col("date_time") - pl.col("date_time").min()).dt.days() % days == 0)
.fill_null(0)
)
Is there anyway to achieve this by returning an expression? Or will I have to act on the DataFrame directly, maybe by using rolling
?
As per the suggestion from @jqurious, by using .clip
I was able to achieve the desired outcome without acting on the DataFrame.
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.col(col).clip(1,1)
.rolling_sum(window_size=f"{days}d", by="date_time")
.over(col)
.fill_null(0)
)
EDIT
I managed to perform the same thing but only count when a condition was true by doing the following.
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.when(condition).then(1).otherwise(0)
.rolling_sum(window_size=f"{days}d", by="date_time")
.over(col)
.fill_null(0)
)