Search code examples
pythondataframepython-polars

Create a new column with the first value that matches a condition


I have a Dataframe similar to this:

import polars as pl

df = pl.from_repr("""
┌──────┬───────┐
│ Time ┆ Value │
│ ---  ┆ ---   │
│ i64  ┆ i64   │
╞══════╪═══════╡
│ 1    ┆ 100   │
│ 2    ┆ 75    │
│ 3    ┆ 70    │
│ 4    ┆ 105   │
│ 5    ┆ 140   │
│ 6    ┆ 220   │
│ 7    ┆ 65    │
│ 8    ┆ 180   │
│ 9    ┆ 150   │
└──────┴───────┘
""")

(Note that it comes sorted by Time)

I need to create a new column named NewColumn, that would yield the next Time where Value is lower than current Value.

EDIT: it is important to specify that my original dataset has more than 10 millions of lines, and even though the original requirement is the one above, it is fair to say that some operations could end up exceeding RAM very quickly. To balance this, it would be acceptable to introduce a lookahead limit to test the original condition. For example it would be acceptable to yield the next Time where Value is lower than current Value up to the next 100 Time

Something like this:

|  Time  | Value  | NewColumn |
|   1    |   100  |       2   | >> next Time with Value lower than 100
|   2    |    75  |       3   | >> next Time with Value lower than 75
|   3    |    70  |       7   | >> next Time with Value lower than 70
|   4    |   105  |       7   | >> next Time with Value lower than 105
|   5    |   140  |       7   | >> next Time with Value lower than 140
|   6    |   220  |       7   | >> next Time with Value lower than 220
|   7    |    65  |    null   | >> next Time with Value lower than 65
|   8    |   180  |       9   | >> next Time with Value lower than 180
|   9    |   150  |    null   | >> next Time with Value lower than 150

So far, my approach have been to try to create a new temporary column that would hold a slice of Value from the next row up to last row, like this:

|  Time  | Value  |  Slice_of_Value   | 
|   1    |   100  | [75, 70, … 150]   | 
|   2    |    75  | [70, 105, … 150]  |
|   3    |    70  | [105, 140, … 150] |
|   4    |   105  | [140, 220, … 150] |
|   5    |   140  | [220, 65, … 150]  |
|   6    |   220  | [65, 180, 150]    |
|   7    |    65  | [180, 150]        |
|   8    |   180  | [150]             |
|   9    |   150  | []                |

Then try to infer the position of the first match satisfying the condition: "lower than column Value". Resulting in something like this:

|  Time  | Value  |  Slice_of_Value   | Position | 
|   1    |   100  | [75, 70, … 150]   |        0 | 
|   2    |    75  | [70, 105, … 150]  |        0 |
|   3    |    70  | [105, 140, … 150] |        3 |
|   4    |   105  | [140, 220, … 150] |        2 |
|   5    |   140  | [220, 65, … 150]  |        1 |
|   6    |   220  | [65, 180, 150]    |        0 |
|   7    |    65  | [180, 150]        |     null |
|   8    |   180  | [150]             |        0 |
|   9    |   150  | []                |     null |

Now I hurt myself with some issues:

Step 1: Slice_of_Value To get the Slice_of_Value column, here is what I have tried at first:

df = df.with_columns(
  pl.col("Value").slice(pl.col("Time"), 9).implode().alias("Slice_of_Value")
)

but it seems that it is not possible to use pl.col("") as part of .slice()... So I resorted to do something like this instead:

df = df.with_columns(
    pl.col("Value").shift(-i).alias(f"lag_{i}") for i in range(1, 9)
).with_columns(
    pl.concat_list([f"lag_{i}" for i in range(1, 9)]).alias("Slice_of_Value")
)

So far so good.

Step 2: Position

df = df.with_columns(
    pl.col("Slice_of_Value")
      .list.eval(pl.arg_where(pl.element() < pl.col("Value")))
      .list.eval(pl.element().first())
      .list.eval(pl.element().drop_nulls())
      .explode()
      .add(pl.col("Time") + 1)
      .alias("NewColumn")
)

Unfortunately this piece of code does not work because named columns are not allowed in list.eval ... So I am kinda hitting a wall now. I don't know if my whole approach is wrong or if I have missed something from the docs.

Any help or suggestion is greatly appreciated :)

EDIT: BENCHMARK RESULTS

So far I have tried 4 solutions on my real dataset. Here are my benchmarks:

SOLUTION 1: 0.83s for 10M rows

lookahead=10
df = dfSource.with_columns(  
      pl.when(pl.col("Value").shift(-i)<pl.col("Value"))
      .then(pl.col("Time").shift(-i)).alias(f"time_{i}") for i in range(1, lookahead+1)
  ).with_columns(
      NewColumn=pl.coalesce(pl.col(f"time_{i}") for i in range(1,lookahead+1))
  ).drop(f"time_{i}" for i in range(1,lookahead+1)).collect()

SOLUTION 2: 3.41s for 10M rows

df.rolling("Time", period=f"{df.height}i", offset="0i").agg(
    x=pl.arg_where(pl.col("Value") < pl.col("Value").first()).first() - 1
)

Gave incorrect results, so I transformed it to this:

df = df.group_by_dynamic(
  "Time",
  every="1i",
  period="10i",
  include_boundaries=False,
  closed="left",
).agg(
  pl.col("Value").alias("Slices")
).select(
    pl.col("Time"),
    pl.col("Slices")
      .list.eval(pl.arg_where(pl.element() < pl.element().first()))
      .list.first()
      .add(pl.col("Time"))
      .alias("NewColumn"),
).collect()

SOLUTION 3: (exceeded RAM)

df.with_columns(position=pl.col("Value").implode()).with_columns(
    next_time=pl.col("position")
    .list.gather(pl.int_ranges(pl.col("Time") - 1, df.height))
    .list.eval(pl.arg_where(pl.element() < pl.element().first()))
    .list.first()
    + pl.col('Time') # +1 and -1 cancel out here.
)

Exceeded RAM because of pl.col("Value").implode() since it means transposing 10M rows into lists of 10M elements...

So I accepted the SOLUTION 1 that produced fastest results on real situation and also cleanest code IMHO (no lists, more concise, no need to do further joins...).

Finally, here are some further benchmarks after increasing the lookahead size.

Lookahead Size  |  SOLUTION 1 Time  |  SOLUTION 2 Time  |
          10    |           0.83s   |         3.41s     |
          20    |           1.24s   |         3.83s     |
          50    |           2.24s   |         5.34s     |
         100    |           4.45s   |         8.10s     |
         200    |           8.58s   |        17.86s     |
         500    |          20.24s   |        66.93s     |
        1000    |          70.63s   |       108.12s     |

Solution

  • Here's a way...

    lookahead=8
    (
        df
            .with_columns(  
                pl.when(pl.col("Value").shift(-i)<pl.col("Value"))
                .then(pl.col("Time").shift(-i)).alias(f"time{i}") for i in range(1, lookahead+1)
            )
            .with_columns(
                NewColumn=pl.coalesce(pl.col(f"time{i}") for i in range(1,lookahead+1))
            ).drop([f"time{i}" for i in range(1,lookahead+1)])
    )
    

    The starting point or strategy is that nested lists are difficult to work with. Usually when facing nested lists you want to find a way to explode them or otherwise not work with them directly. When applying to this scenario, don't make lists, making lists is usually the wrong path unless there's a specific function which easily operates on a list.

    What I did instead was to take your starting point of making a generator of a fixed lookahead but instead of having each column be the shifted value, it's the conditional that you're checking for. Instead of returning that value, it returns the time value or null which goes into the next step which is to coalesce all those columns. Lastly, it just drops all the lookahead columns.