Search code examples
scalaapache-sparklinear-interpolation

Time series interpolation in Scala


I need to interpolate a time series in Scala
Original data is
2020-08-01, value1
2020-08-03, value3

I want to interpolate data in the middle date like this
2020-08-01, value1
2020-08-02, value2
2020-08-03, value3 where value2 is linearly interpolated value of value1 and value3

Can someone please help me with a sample code that does this in Scala Spark? Due to performance reasons, I would prefer to avoid a UDF and use spark.range but I am open to your best solution.

Thank you!


Solution

  • 0. You can group by and get the min, max date from the dataframe and make a sequence, explode it to get the series of dates.

    from pyspark.sql.functions import *
    from pyspark.sql import Window
    
    w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
    
    df.groupBy().agg(min('date').alias('date_min'), max('date').alias('date_max')) \
      .withColumn('date', sequence(to_date('date_min'), to_date('date_max'))) \
      .withColumn('date', explode('date')) \
      .select('date') \
      .join(df, ['date'], 'left') \
      .show(10, False)
    
    +----------+-----+
    |date      |value|
    +----------+-----+
    |2020-08-01|0    |
    |2020-08-02|null |
    |2020-08-03|null |
    |2020-08-04|null |
    |2020-08-05|null |
    |2020-08-06|10   |
    +----------+-----+
    

    1. Only for your case, and the most simple one.

    from pyspark.sql.functions import *
    from pyspark.sql import Window
    
    w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
    
    df.withColumn("value_m1",  last('value', ignorenulls=True).over(w1)) \
      .withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
      .withColumn('value', coalesce(col('value'), expr('value_m1 + value_p1 / 2'))) \
      .show(10, False)
    
    +----------+-----+--------+--------+
    |date      |value|value_m1|value_p1|
    +----------+-----+--------+--------+
    |2020-08-01|0.0  |0       |0       |
    |2020-08-02|5.0  |0       |10      |
    |2020-08-03|10.0 |10      |10      |
    +----------+-----+--------+--------+
    

    2. A bit improved with arbitrary null days. For example when the dataframe is given by this,

    +----------+-----+
    |date      |value|
    +----------+-----+
    |2020-08-01|0    |
    |2020-08-02|null |
    |2020-08-03|null |
    |2020-08-04|null |
    |2020-08-05|null |
    |2020-08-06|10   |
    |2020-08-07|null |
    |2020-08-08|null |
    +----------+-----+
    

    then the code should be changed as follows:

    from pyspark.sql.functions import *
    from pyspark.sql import Window
    
    w1 = Window.orderBy('date').rowsBetween(Window.unboundedPreceding, Window.currentRow)
    w2 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
    w3 = Window.partitionBy('days_m1').orderBy('date')
    w4 = Window.partitionBy('days_p1').orderBy(desc('date'))
    
    df.withColumn("value_m1",  last('value', ignorenulls=True).over(w1)) \
      .withColumn("value_p1", first('value', ignorenulls=True).over(w2)) \
      .withColumn('days_m1', count(when(col('value').isNotNull(), 1)).over(w1)) \
      .withColumn('days_p1', count(when(col('value').isNotNull(), 1)).over(w2)) \
      .withColumn('days_m1', count(lit(1)).over(w3) - 1) \
      .withColumn('days_p1', count(lit(1)).over(w4) - 1) \
      .withColumn('value', coalesce(col('value'), expr('(days_p1 * value_m1 + days_m1 * value_p1) / (days_m1 + days_p1)'))) \
      .orderBy('date') \
      .show(10, False)
    
    +----------+-----+--------+--------+-------+-------+
    |date      |value|value_m1|value_p1|days_m1|days_p1|
    +----------+-----+--------+--------+-------+-------+
    |2020-08-01|0.0  |0       |0       |0      |0      |
    |2020-08-02|2.0  |0       |10      |1      |4      |
    |2020-08-03|4.0  |0       |10      |2      |3      |
    |2020-08-04|6.0  |0       |10      |3      |2      |
    |2020-08-05|8.0  |0       |10      |4      |1      |
    |2020-08-06|10.0 |10      |10      |0      |0      |
    |2020-08-07|null |10      |null    |1      |1      |
    |2020-08-08|null |10      |null    |2      |0      |
    +----------+-----+--------+--------+-------+-------+