Search code examples
pythonpython-polars

Polars - fill null using 'rule of three' based of filtered set


Goal I want to fill the nulls in a series by distributing the difference between the next non-null and previous non-null value. The distribution is not linear but uses the values in another column to calculate the portioning

Example

df = pl.DataFrame({
    "id": ["a", "a", "a", "b", "b", "b", "b", "b"],
    "timestamp": ["2023-09-13 14:05:34", "2023-09-13 14:15:04", "2023-09-13 14:30:01", "2023-09-13 12:12:02", "2023-09-13 12:15:02", "2023-09-13 12:30:07", "2023-09-13 12:45:01", "2023-09-13 13:00:02"],
    "value": [10, None, 30, 5, 10, None, None, 40]
}).with_columns(
    pl.col("timestamp").str.to_datetime(),
)

shape: (8, 3)
┌─────┬─────────────────────┬───────┐
│ id  ┆ timestamp           ┆ value │
│ --- ┆ ---                 ┆ ---   │
│ str ┆ datetime[μs]        ┆ i64   │
╞═════╪═════════════════════╪═══════╡
│ a   ┆ 2023-09-13 14:05:34 ┆ 10    │
│ a   ┆ 2023-09-13 14:15:04 ┆ null  │
│ a   ┆ 2023-09-13 14:30:01 ┆ 30    │
│ b   ┆ 2023-09-13 12:12:02 ┆ 5     │
│ b   ┆ 2023-09-13 12:15:02 ┆ 10    │
│ b   ┆ 2023-09-13 12:30:07 ┆ null  │
│ b   ┆ 2023-09-13 12:45:01 ┆ null  │
│ b   ┆ 2023-09-13 13:00:02 ┆ 40    │
└─────┴─────────────────────┴───────┘

Expected output (with some intermediary columns to show how it is calculated)

shape: (8, 9)
┌─────┬─────────────────────┬───────┬───────────┬────────────┬────────────────┬─────────┬────────────┬───────┐
│ id  ┆ timestamp           ┆ value ┆ gap value ┆ gap time s ┆ gap proportion ┆ portion ┆ fill value ┆ final │
│ --- ┆ ---                 ┆ ---   ┆ ---       ┆ ---        ┆ ---            ┆ ---     ┆ ---        ┆ ---   │
│ str ┆ datetime[ns]        ┆ str   ┆ f64       ┆ f64        ┆ f64            ┆ f64     ┆ f64        ┆ f64   │
╞═════╪═════════════════════╪═══════╪═══════════╪════════════╪════════════════╪═════════╪════════════╪═══════╡
│ a   ┆ 2023-09-13 14:05:34 ┆ 10    ┆ null      ┆ null       ┆ null           ┆ null    ┆ null       ┆ 10.0  │
│ a   ┆ 2023-09-13 14:15:04 ┆ null  ┆ 20.0      ┆ 1467.0     ┆ 570.0          ┆ 7.77    ┆ 17.77      ┆ 17.77 │
│ a   ┆ 2023-09-13 14:30:01 ┆ 30    ┆ null      ┆ null       ┆ null           ┆ null    ┆ null       ┆ 30.0  │
│ b   ┆ 2023-09-13 12:12:02 ┆ 5     ┆ null      ┆ null       ┆ null           ┆ null    ┆ null       ┆ 5.0   │
│ b   ┆ 2023-09-13 12:15:02 ┆ 10    ┆ null      ┆ null       ┆ null           ┆ null    ┆ null       ┆ 10.0  │
│ b   ┆ 2023-09-13 12:30:07 ┆ null  ┆ 30.0      ┆ 2700.0     ┆ 905.0          ┆ 10.06   ┆ 20.06      ┆ 20.06 │
│ b   ┆ 2023-09-13 12:45:01 ┆ null  ┆ 30.0      ┆ 2700.0     ┆ 1799.0         ┆ 19.99   ┆ 29.99      ┆ 29.99 │
│ b   ┆ 2023-09-13 13:00:02 ┆ 40    ┆ null      ┆ null       ┆ null           ┆ null    ┆ null       ┆ 40.0  │
└─────┴─────────────────────┴───────┴───────────┴────────────┴────────────────┴─────────┴────────────┴───────┘

How this calculation works
I will take group 'b' as an example.

  • There are 2 rows with nulls that need filling.
  • the difference between the next and the previous value is 30 ( 40 - 10 )
  • the time difference between the next and previous value is 2700 seconds (13:00:02 - 12:15:02)
  • for the first blank row, the time difference is 905 seconds (12:30:07 - 12:15:02 ). So this row gets the portion 30 * ( 905 / 2700 ) assigned (10.06)
  • so when filling it the fill value is 10 + 10.06
  • the next blank row gets a portion of 30 * ( 1799 / 2700 ) (19.99) so it's fill value is 10 + 19.99

Thanks for the help. I am new to both Polars and Python so my SQL-primed mind is still wrapping around all this.

Personally I feel it would be a great addition to the fill_null, to be able to use a rule of three using a different column to proportion

Thanks


Solution

  • (
        df
        .join_asof(
            df
                .filter(pl.col('value').is_not_null())
                .with_columns(
                    gap_time=(pl.col('timestamp')-pl.col('timestamp').shift().over('id'))
                                .dt.seconds(),
                    prev_good_time=pl.col('timestamp').shift().over('id'),
                    prev_good_value=pl.col('value').shift().over('id')
                    )
                .drop('value'),
            on='timestamp', by='id', strategy='forward'
            )
        .with_columns(
            gap_value=pl.when(pl.col('value').is_null())
                      .then((pl.col('value')-((pl.col('value')
                                              .forward_fill().shift()
                                              ).over('id'))).backward_fill()),
            gap_time=pl.when(pl.col('value').is_null())
                        .then(pl.col('gap_time')),
            gap_proportion=pl.when(pl.col('value').is_null())
                        .then((pl.col('timestamp')-pl.col('prev_good_time')).dt.seconds()),
                      )
        .with_columns(
            portion=pl.col('gap_value')*(pl.col('gap_proportion')/pl.col('gap_time'))
        )
        .with_columns(
            fill_value=pl.col('prev_good_value')+pl.col('portion')
        )
        .select(
            'id','timestamp',
            value=pl.when(pl.col('value').is_null())
                .then(pl.col('fill_value'))
                .otherwise(
                    pl.col('value')
                )
        )
    )
    

    The first thing we do is do a join_asof to a filtered version of the original. That allows us to calculate the time between valid values as well as setting aside the most recent time that associated with a non-null value and the value itself. The asof part of the join means that it will join on a time based but rolls until it finds the next (or previous) matching time and then by some other equality column.

    You could nest most of the rest of the calcs without repeating yourself or using so many contexts but I left it really verbose so it's easy to deconstruct. The reason there are so many calls to with_columns is that you can't set and use a column in the same context so anytime you make a column that you want to use again, you've got to chain another context.

    Output (excluding intermediate columns)

    shape: (8, 3)
    ┌─────┬─────────────────────┬───────────┐
    │ id  ┆ timestamp           ┆ value     │
    │ --- ┆ ---                 ┆ ---       │
    │ str ┆ datetime[μs]        ┆ f64       │
    ╞═════╪═════════════════════╪═══════════╡
    │ a   ┆ 2023-09-13 14:05:34 ┆ 10.0      │
    │ a   ┆ 2023-09-13 14:15:04 ┆ 17.770961 │
    │ a   ┆ 2023-09-13 14:30:01 ┆ 30.0      │
    │ b   ┆ 2023-09-13 12:12:02 ┆ 5.0       │
    │ b   ┆ 2023-09-13 12:15:02 ┆ 10.0      │
    │ b   ┆ 2023-09-13 12:30:07 ┆ 20.055556 │
    │ b   ┆ 2023-09-13 12:45:01 ┆ 29.988889 │
    │ b   ┆ 2023-09-13 13:00:02 ┆ 40.0      │
    └─────┴─────────────────────┴───────────┘
    

    Numpy can do it

    Here's a hacky (as if what's above isn't hacky) way to get numpy to do the work.

    finaldf=[]
    df=df.with_columns(pl.col('value').cast(pl.Float64))
    for little_df in df.partition_by('id'):
        x=little_df.filter(pl.col('value').is_null()).select(pl.col('timestamp').to_physical()).to_numpy()
        xp,fp = little_df.filter(pl.col('value').is_not_null()).select('timestamp','value').to_numpy().transpose()
        finaldf.append(
            pl.concat([
                little_df.filter(pl.col('value').is_not_null()).lazy(),
                little_df.filter(pl.col('value').is_null()).with_columns(value=pl.Series(np.interp(x, xp, fp).transpose()[0])).lazy()
            ])
        )
    finaldf=pl.concat(finaldf).sort(['id','timestamp']).collect()
    finaldf
    shape: (8, 3)
    ┌─────┬─────────────────────┬───────────┐
    │ id  ┆ timestamp           ┆ value     │
    │ --- ┆ ---                 ┆ ---       │
    │ str ┆ datetime[μs]        ┆ f64       │
    ╞═════╪═════════════════════╪═══════════╡
    │ a   ┆ 2023-09-13 14:05:34 ┆ 10.0      │
    │ a   ┆ 2023-09-13 14:15:04 ┆ 17.770961 │
    │ a   ┆ 2023-09-13 14:30:01 ┆ 30.0      │
    │ b   ┆ 2023-09-13 12:12:02 ┆ 5.0       │
    │ b   ┆ 2023-09-13 12:15:02 ┆ 10.0      │
    │ b   ┆ 2023-09-13 12:30:07 ┆ 20.055556 │
    │ b   ┆ 2023-09-13 12:45:01 ┆ 29.988889 │
    │ b   ┆ 2023-09-13 13:00:02 ┆ 40.0      │
    └─────┴─────────────────────┴───────────┘
    

    Another more concise polars way

    On the first round I was fixated on reproducing all the same intermediate columns but if I just go straight for the answer we can do this...

    (
        df.join_asof(
        df.filter(pl.col('value').is_not_null())
        .with_columns(
            value_slope=(pl.col('value')-pl.col('value').shift().over('id'))/(pl.col('timestamp')-pl.col('timestamp').shift().over('id')), 
            value_slope_since=pl.col('timestamp').shift(),
            value_base=pl.col('value').shift()
            )
        .drop('value'),
        on='timestamp', by='id', strategy='forward'
        )
        .select('id','timestamp',value=pl.coalesce(pl.col('value'), pl.col('value_base')+pl.col('value_slope')*(pl.col('timestamp')-pl.col('value_slope_since'))))
        )
    

    An extensible function

    def interp(df, y_col, id_cols=None):
        if not isinstance(y_col, str):
            raise ValueError("y_col should be string")
        if isinstance(id_cols, str):
            id_cols=[id_cols]
        if id_cols is None:
            id_cols=['__dummyid']
            df=df.with_columns(__dummyid=0)
        lf=df.select(id_cols + [y_col]).lazy()
        value_cols=[x for x in df.columns if x not in id_cols and x!=y_col]
        for value_col in value_cols:
            lf=lf.join(
                df.join_asof(
                    df.filter(pl.col(value_col).is_not_null())
                    .select(
                        *id_cols, y_col,
                        __value_slope=(pl.col(value_col)-pl.col(value_col).shift().over(id_cols))/(pl.col(y_col)-pl.col(y_col).shift().over(id_cols)), 
                        __value_slope_since=pl.col(y_col).shift(),
                        __value_base=pl.col(value_col).shift()
                        ),
                    on=y_col, by=id_cols, strategy='forward'
                )
                .select(
                    id_cols+ [y_col] + [pl.coalesce(pl.col(value_col), 
                        pl.coalesce(pl.col('__value_base'), pl.col('__value_base').shift(-1))+
                        pl.coalesce(pl.col('__value_slope'), pl.col('__value_slope').shift(-1))*(pl.col(y_col)-
                        pl.coalesce(pl.col('__value_slope_since'), pl.col('__value_slope_since').shift(-1)))).alias(value_col)]
                    )
                .lazy(),
                on=[y_col]+id_cols
                )
        if id_cols[0]=='__dummyid':
            lf=lf.select(pl.exclude('__dummyid'))
        return lf.collect()
    

    With this function you can just do

    interp(df, "timestamp", "id")
    

    where the first argument is the df, the second is your time or y column. The third optional parameter is if you have an id column(s) (it can take a list or a single string). It will infer that any columns in the df that weren't given to it as a time or id column are values and it will interpolate them.

    If you can monkey patch it to the pl.DataFrame you can use it as a dataframe method like this

    pl.DataFrame.interp=interp
    df.interp('timestamp','id')