I have a timeseries dataset with four fields, eg: user_id, timestamp, miles, and total_mileage. Miles would be the amount of miles driven in one time step, the total_mileage the mileage of the car at the end of the timestep. Miles and total_mileage have missing values, there can be one or more consecutive missings. I want to interpolate the missing values for miles, based on the sum of the total missing gaps. The timesteps are equally distributed. So, for example:
user_id | timestamp | miles | total_mileage |
---|---|---|---|
1 | 2023-01-01 00:00 | 3 | 10 |
1 | 2023-01-01 01:00 | null | null |
1 | 2023-01-01 02:00 | null | null |
1 | 2023-01-01 03:00 | null | null |
1 | 2023-01-01 04:00 | 4 | 20 |
Should become:
user_id | timestamp | miles | total_mileage | was_missing |
---|---|---|---|---|
1 | 2023-01-01 00:00 | 3 | 10 | 0 |
1 | 2023-01-01 01:00 | 2 | 12 | 1 |
1 | 2023-01-01 02:00 | 2 | 14 | 1 |
1 | 2023-01-01 03:00 | 2 | 16 | 1 |
1 | 2023-01-01 04:00 | 4 | 20 | 0 |
There are multiple users in my dataset, so this needs to be done for each user.
I wrote this function:
def id_missing_gaps(df: DataFrame, val_col: StringType, id_col: StringType, ts_col: StringType) -> DataFrame:
windowSpec = Window.partitionBy(id_col).orderBy(ts_col)
# Create 'value_forward' column using lead function to look at the next row's 'value'
df = df.withColumn("value_forward", F.lead(val_col).over(windowSpec))
# Create 'value_backward' column using lag function to look at the previous row's 'value'
df = df.withColumn("value_backward", F.lag(val_col).over(windowSpec))
# Using lead() to look at the next row's 'value' for is_before_gap
df = df.withColumn("next_value", F.lead(val_col).over(windowSpec))
df = df.withColumn("is_before_gap", F.when((F.col(val_col).isNotNull()) &
(F.lead(val_col).over(windowSpec).isNull()) &
(F.lead(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the last row
True).otherwise(False))
df = df.withColumn("is_after_gap", F.when(
(F.col(val_col).isNotNull()) &
(F.col("value_backward").isNull()) &
(F.lag(ts_col).over(windowSpec).isNotNull()), # Ensure it's not the first row
True).otherwise(False))
# Clean up temporary columns
df = df.drop("next_value", "value_forward", "value_backward")
df = df.withColumn("is_gap", F.when(((F.col(val_col).isNull()) |
(F.col("is_before_gap") == True) |
(F.col("is_after_gap") == True)),
1).otherwise(0))
df = df.filter(F.col("is_gap") == 1)
return df
This identifies each 'gap' with missing values and the value immediately before and after that. I thought I could use the remaining dataframe to interpolate all values, but I haven't been able to find a way that I could achieve what I want.
So for the example above the output of my function is:
user_id | timestamp | miles | total_mileage | is_before_gap | is_after_gap | is_gap |
---|---|---|---|---|---|---|
1 | 2023-01-01 00:00:00 | 3 | 10 | true | false | 1 |
1 | 2023-01-01 01:00:00 | null | null | false | false | 1 |
1 | 2023-01-01 02:00:00 | null | null | false | false | 1 |
1 | 2023-01-01 03:00:00 | null | null | false | false | 1 |
1 | 2023-01-01 04:00:00 | 4 | 20 | false | true | 1 |
I did find this answer about interpolating in pyspark: How to interpolate a column within a grouped object in PySpark? And read the medium article that was linked in that: https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1 But that doesn't take into consideration the total_mileage. One option I'm considering is calculating the total_mileage on the last timestep of the missing values (in the example that would be 16), and then interpolating the total_mileage column with the method from the link above, and then use those values to calculate the miles column. This seems inefficient however. The data I'm using has millions of rows, so I'm hesitant to use pyspark pandas and/or a UDF. If anyone can point me in the right direction on how to proceed that would be much appreciated.
This was a nice fun problem to solve.
In pyspark, you can populate a column over a window specification with first not Null value or last not Null value.
Then we can also identify the groups of nulls which come together as a bunch and then rank over them.
Once, we have those above two values, calculating the interpolated values is just matter of arithmetic using the populated values and rank.
Here's a working example.
import pyspark.sql.functions as F
import pyspark.sql.functions as *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when
from pyspark.sql.window import Window
# Sample data
data = [
("1", "2023-01-01 00:00", 3, 10),
("1", "2023-01-01 01:00", None, None),
("1", "2023-01-01 02:00", None, None),
("1", "2023-01-01 03:00", None, None),
("1", "2023-01-01 04:00", 4, 20),
("1", "2023-01-01 05:00", None, None),
("1", "2023-01-01 06:00", None, None),
("1", "2023-01-01 07:00", None, None),
("1", "2023-01-01 08:00", 4, 30)
]
schema = ["user_id", "timestamp", "miles", "total_mileage"]
spark = SparkSession.builder.appName("InterpolateNulls").getOrCreate()
df = spark.createDataFrame(data, schema=schema)
df = df.withColumn("was_missing", when(col("miles").isNull(), 1).otherwise(0))
df.show()
windowSpecLast = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
windowSpecFirst = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(0, Window.unboundedFollowing)
windowSpecNormal = Window.partitionBy("user_id").orderBy("timestamp")
df = df.withColumn("diff_mile", col("total_mileage") - col("miles"))
# Identify the rows that are immediately after a non-null row (start of window) and
# rows immediately before a non-null row (end of window)
df = df.withColumn("last_nonnull_mileage", F.last("total_mileage", ignorenulls=True).over(windowSpecLast))
df = df.withColumn("first_nonnull_mileage", F.first("diff_mile", ignorenulls=True).over(windowSpecFirst))
df = df.withColumn("first_mileage", F.lead("total_mileage").over(windowSpecNormal))
df.show()
df = df.withColumn("start_col", when(
((col("total_mileage").isNotNull()) & (col("first_mileage").isNull())), 1).otherwise(0))
windowSpec = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
df = df.withColumn("groupId", sum("start_col").over(windowSpec))
df_nulls_grouped = df.groupby("groupId", "user_id").agg(F.count(when(col("miles").isNull(), 1)).alias("nulls_in_miles"))
# Display the result
df_nulls_grouped.show()
df_new = df.join(df_nulls_grouped, on=["user_id", "groupId"], how="inner")
df_new.orderBy("timestamp").show()
windowSpecRankOverNulls = Window.partitionBy("user_id", "groupId").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, 0)
df_ranked = df_new.withColumn("ranked", F.rank().over(windowSpecRankOverNulls))
df_ranked.show()
df_ranked = df_ranked.withColumn("miles_inter", (col("first_nonnull_mileage") - col("last_nonnull_mileage")) / col("nulls_in_miles"))
df_ranked = df_ranked.withColumn("total_mileage_inter", col("last_nonnull_mileage") + (col("miles_inter") * (col("ranked") - 1)))
df_ranked.show()
df_final = df_ranked.withColumn("miles_final", when(col("miles").isNotNull(), col("miles")).otherwise(col("miles_inter")))
df_final = df_final.withColumn("total_mileage_final", when(col("total_mileage").isNotNull(), col("total_mileage")).otherwise(col("total_mileage_inter")))
df_final.show()
df_final = df_final.select("user_id", "timestamp", "miles_final", "total_mileage_final", "was_missing")
df_final.show()
Final Output :
+-------+----------------+-----------+-------------------+-----------+
|user_id| timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
| 1|2023-01-01 00:00| 3.0| 10.0| 0|
| 1|2023-01-01 01:00| 2.0| 12.0| 1|
| 1|2023-01-01 02:00| 2.0| 14.0| 1|
| 1|2023-01-01 03:00| 2.0| 16.0| 1|
| 1|2023-01-01 04:00| 4.0| 20.0| 0|
| 1|2023-01-01 05:00| 2.0| 22.0| 1|
| 1|2023-01-01 06:00| 2.0| 24.0| 1|
| 1|2023-01-01 07:00| 2.0| 26.0| 1|
| 1|2023-01-01 08:00| 4.0| 30.0| 0|
+-------+----------------+-----------+-------------------+-----------+
Total Output :
+-------+----------------+-----+-------------+-----------+
|user_id| timestamp|miles|total_mileage|was_missing|
+-------+----------------+-----+-------------+-----------+
| 1|2023-01-01 00:00| 3| 10| 0|
| 1|2023-01-01 01:00| NULL| NULL| 1|
| 1|2023-01-01 02:00| NULL| NULL| 1|
| 1|2023-01-01 03:00| NULL| NULL| 1|
| 1|2023-01-01 04:00| 4| 20| 0|
| 1|2023-01-01 05:00| NULL| NULL| 1|
| 1|2023-01-01 06:00| NULL| NULL| 1|
| 1|2023-01-01 07:00| NULL| NULL| 1|
| 1|2023-01-01 08:00| 4| 30| 0|
+-------+----------------+-----+-------------+-----------+
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
|user_id| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL|
| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL|
| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL|
| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20|
| 1|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL|
| 1|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL|
| 1|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL|
| 1|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30|
| 1|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL|
+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+
+-------+-------+--------------+
|groupId|user_id|nulls_in_miles|
+-------+-------+--------------+
| 1| 1| 3|
| 2| 1| 3|
| 3| 1| 0|
+-------+-------+--------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
|user_id|groupId| timestamp|miles|total_mileage|was_missing|diff_mile|last_nonnull_mileage|first_nonnull_mileage|first_mileage|start_col|nulls_in_miles|ranked| miles_inter|total_mileage_inter|miles_final|total_mileage_final|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
| 1| 1|2023-01-01 00:00| 3| 10| 0| 7| 10| 7| NULL| 1| 3| 1| -1.0| 10.0| 3.0| 10.0|
| 1| 1|2023-01-01 01:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 2| 2.0| 12.0| 2.0| 12.0|
| 1| 1|2023-01-01 02:00| NULL| NULL| 1| NULL| 10| 16| NULL| 0| 3| 3| 2.0| 14.0| 2.0| 14.0|
| 1| 1|2023-01-01 03:00| NULL| NULL| 1| NULL| 10| 16| 20| 0| 3| 4| 2.0| 16.0| 2.0| 16.0|
| 1| 2|2023-01-01 04:00| 4| 20| 0| 16| 20| 16| NULL| 1| 3| 1|-1.3333333333333333| 20.0| 4.0| 20.0|
| 1| 2|2023-01-01 05:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 2| 2.0| 22.0| 2.0| 22.0|
| 1| 2|2023-01-01 06:00| NULL| NULL| 1| NULL| 20| 26| NULL| 0| 3| 3| 2.0| 24.0| 2.0| 24.0|
| 1| 2|2023-01-01 07:00| NULL| NULL| 1| NULL| 20| 26| 30| 0| 3| 4| 2.0| 26.0| 2.0| 26.0|
| 1| 3|2023-01-01 08:00| 4| 30| 0| 26| 30| 26| NULL| 1| 0| 1| NULL| NULL| 4.0| 30.0|
+-------+-------+----------------+-----+-------------+-----------+---------+--------------------+---------------------+-------------+---------+--------------+------+-------------------+-------------------+-----------+-------------------+
+-------+----------------+-----------+-------------------+-----------+
|user_id| timestamp|miles_final|total_mileage_final|was_missing|
+-------+----------------+-----------+-------------------+-----------+
| 1|2023-01-01 00:00| 3.0| 10.0| 0|
| 1|2023-01-01 01:00| 2.0| 12.0| 1|
| 1|2023-01-01 02:00| 2.0| 14.0| 1|
| 1|2023-01-01 03:00| 2.0| 16.0| 1|
| 1|2023-01-01 04:00| 4.0| 20.0| 0|
| 1|2023-01-01 05:00| 2.0| 22.0| 1|
| 1|2023-01-01 06:00| 2.0| 24.0| 1|
| 1|2023-01-01 07:00| 2.0| 26.0| 1|
| 1|2023-01-01 08:00| 4.0| 30.0| 0|
+-------+----------------+-----------+-------------------+-----------+