Search code examples
dataframepysparkdata-cleaninglinear-interpolation

How can I interpolate missing values based on the sum of the gap using pyspark?


I have a timeseries dataset with four fields, eg: user_id, timestamp, miles, and total_mileage. Miles would be the amount of miles driven in one time step, the total_mileage the mileage of the car at the end of the timestep. Miles and total_mileage have missing values, there can be one or more consecutive missings. I want to interpolate the missing values for miles, based on the sum of the total missing gaps. The timesteps are equally distributed. So, for example:

user_id timestamp miles total_mileage
1 2023-01-01 00:00 3 10
1 2023-01-01 01:00 null null
1 2023-01-01 02:00 null null
1 2023-01-01 03:00 null null
1 2023-01-01 04:00 4 20

Should become:

user_id timestamp miles total_mileage was_missing
1 2023-01-01 00:00 3 10 0
1 2023-01-01 01:00 2 12 1
1 2023-01-01 02:00 2 14 1
1 2023-01-01 03:00 2 16 1
1 2023-01-01 04:00 4 20 0

There are multiple users in my dataset, so this needs to be done for each user.

I wrote this function:

def id_missing_gaps(df: DataFrame, val_col: StringType, id_col: StringType, ts_col: StringType) -> DataFrame:
    windowSpec = Window.partitionBy(id_col).orderBy(ts_col)
    # Create 'value_forward' column using lead function to look at the next row's 'value'
    df = df.withColumn("value_forward", F.lead(val_col).over(windowSpec))
    # Create 'value_backward' column using lag function to look at the previous row's 'value'
    df = df.withColumn("value_backward", F.lag(val_col).over(windowSpec))
    # Using lead() to look at the next row's 'value' for is_before_gap
    df = df.withColumn("next_value", F.lead(val_col).over(windowSpec))

    df = df.withColumn("is_before_gap", F.when((F.col(val_col).isNotNull()) & 
                        (F.lead(val_col).over(windowSpec).isNull()) &
                        (F.lead(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the last row
                        True).otherwise(False))

    df = df.withColumn("is_after_gap", F.when(
                            (F.col(val_col).isNotNull()) & 
                            (F.col("value_backward").isNull()) &
                            (F.lag(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the first row
                            True).otherwise(False))

    # Clean up temporary columns
    df = df.drop("next_value", "value_forward", "value_backward")
    
    df = df.withColumn("is_gap", F.when(((F.col(val_col).isNull()) |
                        (F.col("is_before_gap") == True) |
                        (F.col("is_after_gap") == True)), 
                        1).otherwise(0))
    
    df = df.filter(F.col("is_gap") == 1)

    return df

This identifies each 'gap' with missing values and the value immediately before and after that. I thought I could use the remaining dataframe to interpolate all values, but I haven't been able to find a way that I could achieve what I want.

So for the example above the output of my function is:

user_id timestamp miles total_mileage is_before_gap is_after_gap is_gap
1 2023-01-01 00:00:00 3 10 true false 1
1 2023-01-01 01:00:00 null null false false 1
1 2023-01-01 02:00:00 null null false false 1
1 2023-01-01 03:00:00 null null false false 1
1 2023-01-01 04:00:00 4 20 false true 1

I did find this answer about interpolating in pyspark: How to interpolate a column within a grouped object in PySpark? And read the medium article that was linked in that: https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1 But that doesn't take into consideration the total_mileage. One option I'm considering is calculating the total_mileage on the last timestep of the missing values (in the example that would be 16), and then interpolating the total_mileage column with the method from the link above, and then use those values to calculate the miles column. This seems inefficient however. The data I'm using has millions of rows, so I'm hesitant to use pyspark pandas and/or a UDF. If anyone can point me in the right direction on how to proceed that would be much appreciated.


Solution

  • This was a nice fun problem to solve.

    In pyspark, you can populate a column over a window specification with first not Null value or last not Null value.

    Then we can also identify the groups of nulls which come together as a bunch and then rank over them.

    Once, we have those above two values, calculating the interpolated values is just matter of arithmetic using the populated values and rank.

    Here's a working example.

    import pyspark.sql.functions as F
    import pyspark.sql.functions as *    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sum, when
    from pyspark.sql.window import Window
    
    # Sample data
    data = [
        ("1", "2023-01-01 00:00", 3, 10),
        ("1", "2023-01-01 01:00", None, None),
        ("1", "2023-01-01 02:00", None, None),
        ("1", "2023-01-01 03:00", None, None),
        ("1", "2023-01-01 04:00", 4, 20),
        ("1", "2023-01-01 05:00", None, None),
        ("1", "2023-01-01 06:00", None, None),
        ("1", "2023-01-01 07:00", None, None),
        ("1", "2023-01-01 08:00", 4, 30)
    
    ]
    
    schema = ["user_id", "timestamp", "miles", "total_mileage"]
    
    
    spark = SparkSession.builder.appName("InterpolateNulls").getOrCreate()
    
    df = spark.createDataFrame(data, schema=schema)
    df = df.withColumn("was_missing", when(col("miles").isNull(), 1).otherwise(0))
    df.show()
    
    windowSpecLast = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
    windowSpecFirst = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(0, Window.unboundedFollowing)
    windowSpecNormal = Window.partitionBy("user_id").orderBy("timestamp")
    
    df = df.withColumn("diff_mile", col("total_mileage") - col("miles"))
    
    # Identify the rows that are immediately after a non-null row (start of window) and
    # rows immediately before a non-null row (end of window)
    df = df.withColumn("last_nonnull_mileage", F.last("total_mileage", ignorenulls=True).over(windowSpecLast))
    df = df.withColumn("first_nonnull_mileage", F.first("diff_mile", ignorenulls=True).over(windowSpecFirst))
    df = df.withColumn("first_mileage", F.lead("total_mileage").over(windowSpecNormal))
    
    df.show()
    
    df = df.withColumn("start_col", when(
        ((col("total_mileage").isNotNull()) & (col("first_mileage").isNull())), 1).otherwise(0))
    
    
    windowSpec = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
    
    df = df.withColumn("groupId", sum("start_col").over(windowSpec))
    
    df_nulls_grouped = df.groupby("groupId", "user_id").agg(F.count(when(col("miles").isNull(), 1)).alias("nulls_in_miles"))
    
    # Display the result
    df_nulls_grouped.show()
    
    df_new = df.join(df_nulls_grouped, on=["user_id", "groupId"], how="inner")
    
    df_new.orderBy("timestamp").show()
    
    windowSpecRankOverNulls = Window.partitionBy("user_id", "groupId").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
    
    df_ranked = df_new.withColumn("ranked", F.rank().over(windowSpecRankOverNulls))
    
    df_ranked.show()
    
    df_ranked = df_ranked.withColumn("miles_inter", (col("first_nonnull_mileage") - col("last_nonnull_mileage")) / col("nulls_in_miles"))
    df_ranked = df_ranked.withColumn("total_mileage_inter", col("last_nonnull_mileage") + (col("miles_inter") * (col("ranked") - 1)))
    
    df_ranked.show()
    
    df_final = df_ranked.withColumn("miles_final", when(col("miles").isNotNull(), col("miles")).otherwise(col("miles_inter")))
    df_final = df_final.withColumn("total_mileage_final", when(col("total_mileage").isNotNull(), col("total_mileage")).otherwise(col("total_mileage_inter")))
    df_final.show()
    
    df_final = df_final.select("user_id", "timestamp", "miles_final", "total_mileage_final", "was_missing")
    df_final.show()
    

    Final Output :

    +-------+----------------+-----------+-------------------+-----------+
    |user_id|       timestamp|miles_final|total_mileage_final|was_missing|
    +-------+----------------+-----------+-------------------+-----------+
    |      1|2023-01-01 00:00|        3.0|               10.0|          0|
    |      1|2023-01-01 01:00|        2.0|               12.0|          1|
    |      1|2023-01-01 02:00|        2.0|               14.0|          1|
    |      1|2023-01-01 03:00|        2.0|               16.0|          1|
    |      1|2023-01-01 04:00|        4.0|               20.0|          0|
    |      1|2023-01-01 05:00|        2.0|               22.0|          1|
    |      1|2023-01-01 06:00|        2.0|               24.0|          1|
    |      1|2023-01-01 07:00|        2.0|               26.0|          1|
    |      1|2023-01-01 08:00|        4.0|               30.0|          0|
    +-------+----------------+-----------+-------------------+-----------+
    

    Total Output :

    +-------+----------------+-----+-------------+-----------+
    |user_id|       timestamp|miles|total_mileage|was_missing|
    +-------+----------------+-----+-------------+-----------+
    |      1|2023-01-01 00:00|    3|           10|          0|
    |      1|2023-01-01 01:00| NULL|         NULL|          1|
    |      1|2023-01-01 02:00| NULL|         NULL|          1|
    |      1|2023-01-01 03:00| NULL|         NULL|          1|
    |      1|2023-01-01 04:00|    4|           20|          0|
    |      1|2023-01-01 05:00| NULL|         NULL|          1|
    |      1|2023-01-01 06:00| NULL|         NULL|          1|
    |      1|2023-01-01 07:00| NULL|         NULL|          1|
    |      1|2023-01-01 08:00|    4|           30|          0|
    +-------+----------------+-----+-------------+-----------+
    
    +-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
    |user_id|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|
    +-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
    |      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|
    |      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|
    |      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|
    |      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|
    |      1|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|
    |      1|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|
    |      1|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|
    |      1|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|
    |      1|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|
    +-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
    
    +-------+-------+--------------+
    |groupId|user_id|nulls_in_miles|
    +-------+-------+--------------+
    |      1|      1|             3|
    |      2|      1|             3|
    |      3|      1|             0|
    +-------+-------+--------------+
    
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
    |user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
    |      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|
    |      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|
    |      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|
    |      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|
    |      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|
    |      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|
    |      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|
    |      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|
    |      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
    
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
    |user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
    |      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|     1|
    |      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     2|
    |      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     3|
    |      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|     4|
    |      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|     1|
    |      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     2|
    |      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     3|
    |      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|     4|
    |      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|     1|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
    
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
    |user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|        miles_inter|total_mileage_inter|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
    |      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|     1|               -1.0|               10.0|
    |      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     2|                2.0|               12.0|
    |      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     3|                2.0|               14.0|
    |      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|     4|                2.0|               16.0|
    |      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|     1|-1.3333333333333333|               20.0|
    |      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     2|                2.0|               22.0|
    |      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     3|                2.0|               24.0|
    |      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|     4|                2.0|               26.0|
    |      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|     1|               NULL|               NULL|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
    
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
    |user_id|groupId|       timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|        miles_inter|total_mileage_inter|miles_final|total_mileage_final|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
    |      1|      1|2023-01-01 00:00|    3|           10|          0|        7|                  10|                    7|         NULL|        1|             3|     1|               -1.0|               10.0|        3.0|               10.0|
    |      1|      1|2023-01-01 01:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     2|                2.0|               12.0|        2.0|               12.0|
    |      1|      1|2023-01-01 02:00| NULL|         NULL|          1|     NULL|                  10|                   16|         NULL|        0|             3|     3|                2.0|               14.0|        2.0|               14.0|
    |      1|      1|2023-01-01 03:00| NULL|         NULL|          1|     NULL|                  10|                   16|           20|        0|             3|     4|                2.0|               16.0|        2.0|               16.0|
    |      1|      2|2023-01-01 04:00|    4|           20|          0|       16|                  20|                   16|         NULL|        1|             3|     1|-1.3333333333333333|               20.0|        4.0|               20.0|
    |      1|      2|2023-01-01 05:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     2|                2.0|               22.0|        2.0|               22.0|
    |      1|      2|2023-01-01 06:00| NULL|         NULL|          1|     NULL|                  20|                   26|         NULL|        0|             3|     3|                2.0|               24.0|        2.0|               24.0|
    |      1|      2|2023-01-01 07:00| NULL|         NULL|          1|     NULL|                  20|                   26|           30|        0|             3|     4|                2.0|               26.0|        2.0|               26.0|
    |      1|      3|2023-01-01 08:00|    4|           30|          0|       26|                  30|                   26|         NULL|        1|             0|     1|               NULL|               NULL|        4.0|               30.0|
    +-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
    
    +-------+----------------+-----------+-------------------+-----------+
    |user_id|       timestamp|miles_final|total_mileage_final|was_missing|
    +-------+----------------+-----------+-------------------+-----------+
    |      1|2023-01-01 00:00|        3.0|               10.0|          0|
    |      1|2023-01-01 01:00|        2.0|               12.0|          1|
    |      1|2023-01-01 02:00|        2.0|               14.0|          1|
    |      1|2023-01-01 03:00|        2.0|               16.0|          1|
    |      1|2023-01-01 04:00|        4.0|               20.0|          0|
    |      1|2023-01-01 05:00|        2.0|               22.0|          1|
    |      1|2023-01-01 06:00|        2.0|               24.0|          1|
    |      1|2023-01-01 07:00|        2.0|               26.0|          1|
    |      1|2023-01-01 08:00|        4.0|               30.0|          0|
    +-------+----------------+-----------+-------------------+-----------+