Search code examples
pysparkimputation

Pyspark forward and backward fill within column level


I try to fill missing data in a pyspark dataframe. The pyspark dataframe looks as such:

+---------+---------+-------------------+----+
| latitude|longitude|      timestamplast|name|
+---------+---------+-------------------+----+
|         | 4.905615|2019-08-01 00:00:00|   1|
|51.819645|         |2019-08-01 00:00:00|   1|
| 51.81964| 4.961713|2019-08-01 00:00:00|   2|
|         |         |2019-08-01 00:00:00|   3|
| 51.82918| 4.911187|                   |   3|
| 51.82385| 4.901488|2019-08-01 00:00:03|   5|
+---------+---------+-------------------+----+

Within the column "name" I want to either forward fill or backward fill (whichever is necessary) to fill only "latitude" and "longitude" ("timestamplast" should not be filled). How do I do this?

Output will be:

+---------+---------+-------------------+----+
| latitude|longitude|      timestamplast|name|
+---------+---------+-------------------+----+
|51.819645| 4.905615|2019-08-01 00:00:00|   1|
|51.819645| 4.905615|2019-08-01 00:00:00|   1|
| 51.81964| 4.961713|2019-08-01 00:00:00|   2|
| 51.82918| 4.911187|2019-08-01 00:00:00|   3|
| 51.82918| 4.911187|                   |   3|
| 51.82385| 4.901488|2019-08-01 00:00:03|   5|
+---------+---------+-------------------+----+

In Pandas this would be done as such:

df = df.groupby("name")['longitude','latitude'].apply(lambda x : x.ffill().bfill())

How would this be done in Pyspark?


Solution

  • I suggest you use the following two Window Specs:

    from pyspark.sql import Window
    w1 = Window.partitionBy('name').orderBy('timestamplast')
    w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    

    Where:

    1. w1 is the regular WinSpec we use to calculate the forward-fill which is the same as the following:

      w1 = Window.partitionBy('name').orderBy('timestamplast').rowsBetween(Window.unboundedPreceding,0)
      

      see the following note from the documentation for default window frames:

      Note: When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.

    2. after ffill, we only need to fix the null values at the very front if exists, so we can use a fixed Window frame(Between Window.unboundedPreceding and Window.unboundedFollowing), this is more efficient than using a running Window frame since it requires only one aggregate, see SPARK-8638

    Then the x.ffill().bfill() can be handled by using coalesce + last + first based on the above two WindowSpecs:

    from pyspark.sql.functions import coalesce, last, first
    
    df.withColumn('latitude_new', coalesce(last('latitude',True).over(w1), first('latitude',True).over(w2))) \
      .select('name','timestamplast', 'latitude','latitude_new') \
      .show()
    +----+-------------------+---------+------------+
    |name|      timestamplast| latitude|latitude_new|
    +----+-------------------+---------+------------+
    |   1|2019-08-01 00:00:00|     null|   51.819645|
    |   1|2019-08-01 00:00:01|     null|   51.819645|
    |   1|2019-08-01 00:00:02|51.819645|   51.819645|
    |   1|2019-08-01 00:00:03| 51.81964|    51.81964|
    |   1|2019-08-01 00:00:04|     null|    51.81964|
    |   1|2019-08-01 00:00:05|     null|    51.81964|
    |   1|2019-08-01 00:00:06|     null|    51.81964|
    |   1|2019-08-01 00:00:07| 51.82385|    51.82385|
    +----+-------------------+---------+------------+
    

    Edit: to process (ffill+bfill) on multiple columns, use a list comprehension:

    cols = ['latitude', 'longitude']
    df_new = df.select([ c for c in df.columns if c not in cols ] + [ coalesce(last(c,True).over(w1), first(c,True).over(w2)).alias(c) for c in cols ])