Search code examples

Polars Time Series Path Dependent Event Outcome calculation

In the demo DataFrame I have three events:

df = pl.DataFrame(
        "timestamp": [1, 2, 3, 4, 5, 6, 7, 8],
        "threshold": [8, None, None, None, 5, None, None, 8],
        "value": [2, 3, 4, 5, 6, 7, 8, 9],
        "event": [1, 0, 0, 0, 1, 0, 0, 1],
        "start_ts": [1, None, None, None, 5, None, None, 8],
        "end_ts": [6, None, None, None, 8, None, None, 8],
        "event_id": [0, None, None, None, 1, None, None, 2],


shape: (8, 8)
│ timestamp ┆ threshold ┆ value ┆ event ┆ start_ts ┆ end_ts ┆ event_id ┆ event_span │
│ ---       ┆ ---       ┆ ---   ┆ ---   ┆ ---      ┆ ---    ┆ ---      ┆ ---        │
│ i64       ┆ i64       ┆ i64   ┆ i64   ┆ i64      ┆ i64    ┆ i64      ┆ i64        │
│ 1         ┆ 8         ┆ 2     ┆ 1     ┆ 1        ┆ 6      ┆ 0        ┆ 5          │
│ 2         ┆ null      ┆ 3     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 3         ┆ null      ┆ 4     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 4         ┆ null      ┆ 5     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 5         ┆ 5         ┆ 6     ┆ 1     ┆ 5        ┆ 8      ┆ 1        ┆ 3          │
│ 6         ┆ null      ┆ 7     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 7         ┆ null      ┆ 8     ┆ 0     ┆ null     ┆ null   ┆ null     ┆ null       │
│ 8         ┆ 8         ┆ 9     ┆ 1     ┆ 8        ┆ 8      ┆ 2        ┆ 0          │
  • timestamp is the timestamp in the real world.
  • threshold is the value that every events' value need to achieve, or exceed during the event span.
  • value is the value at each timestamp, we can have duplicated values.
  • event is a binary column indicating whether a certain timestamp generates event.
  • start_ts is the starting timestamp of an event. For example, a start_ts of 1 means that the event will start at the end of timestamp1, at the beginning of timestamp2
  • end_ts is the ending timestamp of an event.
  • event_id is a unique identifier for each event.
  • event_span is the number of timestamp that an event spans.

Problem: I want to identify:

  1. event outcome: binary value indicating whether the threshold of each event is reached by value during each event span.
  2. event outcome timestamp: the timestamp where the first time of value reaching threshold

Additional Note:

  • Event 0 spans [2, 3, 4, 5, 6], event 1 spans [6, 7, 8], and event 2 spans nothings.
  • The events would not span beyond the data that we have here (e.g., end_ts <= timestamp)

Desired Output:

shape: (8, 10)
│ timestamp ┆ threshold ┆ value ┆ event ┆ … ┆ event_id ┆ event_span ┆ event_outcome ┆ event_outcom │
│ ---       ┆ ---       ┆ ---   ┆ ---   ┆   ┆ ---      ┆ ---        ┆ ---           ┆ e_timestamp  │
│ i64       ┆ i64       ┆ i64   ┆ i64   ┆   ┆ i64      ┆ i64        ┆ i32           ┆ ---          │
│           ┆           ┆       ┆       ┆   ┆          ┆            ┆               ┆ i64          │
│ 1         ┆ 8         ┆ 2     ┆ 1     ┆ … ┆ 0        ┆ 5          ┆ 0             ┆ null         │
│ 2         ┆ null      ┆ 3     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 3         ┆ null      ┆ 4     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 4         ┆ null      ┆ 5     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 5         ┆ 5         ┆ 6     ┆ 1     ┆ … ┆ 1        ┆ 3          ┆ 1             ┆ 6            │
│ 6         ┆ null      ┆ 7     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 7         ┆ null      ┆ 8     ┆ 0     ┆ … ┆ null     ┆ null       ┆ null          ┆ null         │
│ 8         ┆ 8         ┆ 9     ┆ 1     ┆ … ┆ 2        ┆ 0          ┆ null          ┆ null         │

My current solution involves generating the full path values of each event, which is highly resource intensive, especially when the events overlap with each other:

event_df = (
    .filter(pl.col("event") == 1, pl.col("event_span") > 0)
        pl.int_ranges(pl.col("start_ts")+1, pl.col("end_ts")+1) # Map event full path
    .explode("event_timestamps") # Generate event full path
        .select(pl.col("timestamp"), pl.col("value"))
        .rename({"timestamp": "event_timestamps"}), 
        on="event_timestamps", how="left") # Get the value of the full path
    .with_columns(# Map event outcome based on threshold
        pl.when(pl.col("value") >= pl.col("threshold"))
    .with_columns(# Get event outcome timestamp
        pl.when(pl.col("event_outcome") == 1)
    .with_columns(# Map event outcome to the event start timestamp
        pl.col("event_outcome", "event_outcome_timestamp")
    .with_columns(# Take care of the events that have not exceeded the threshold
    .select(pl.col("event_id", "event_outcome", "event_outcome_timestamp"))

shape: (2, 3)
│ event_id ┆ event_outcome ┆ event_outcome_timestamp │
│ ---      ┆ ---           ┆ ---                     │
│ i64      ┆ i32           ┆ i64                     │
│ 0        ┆ 0             ┆ null                    │
│ 1        ┆ 1             ┆ 6                       │
df = df.join(event_df, on="event_id", how="left")


  • Polars recently released inequality joins with join_where, which is great for scenarios with timespans like this one.

    aggregated = (
            # event starts after start_ts, and goes until end_ts
            pl.col("timestamp_right") > pl.col("start_ts"),
            pl.col("timestamp_right") <= pl.col("end_ts"),
            event_outcome=(pl.col("value_right") >= pl.col("threshold")).any().cast(pl.Int32),
                .filter(pl.col("value_right") >= pl.col("threshold"))
    # aggregated
    # shape: (2, 3)
    # ┌──────────┬───────────────┬─────────────────────────┐
    # │ event_id ┆ event_outcome ┆ event_outcome_timestamp │
    # │ ---      ┆ ---           ┆ ---                     │
    # │ i64      ┆ i32           ┆ i64                     │
    # ╞══════════╪═══════════════╪═════════════════════════╡
    # │ 0        ┆ 0             ┆ null                    │
    # │ 1        ┆ 1             ┆ 6                       │
    # └──────────┴───────────────┴─────────────────────────┘
    df.join(aggregated, on="event_id", how="left")
    # same as desired outcome

    Have a look at the result of aggregated after the join_where, before the group_by to get a better understanding of how it is working