Search code examples
scalaapache-sparkwindow-functions

Spark Window functions: is it possible to get other values directly from a row found with the first/last functions?


In Spark it is possible to get the first not null value after the current occurrence for a column in a window:

val window = Window
  .orderBy("id")

val df = Seq(
  (0, "Bob", Some(123)),
  (1, "Jack", None),
  (2, "Brian", None),
  (3, "John", Some(456)),
  (4, "Edgar", None)
).toDF("id", "name", "value")

df
  .withColumn("firstNonNullValueAfterRow", first("value", true)
    .over(window.rowsBetween(Window.currentRow, Window.unboundedFollowing)))
  .show()

Output:

+---+-----+-----+-------------------------+
| id| name|value|firstNonNullValueAfterRow|
+---+-----+-----+-------------------------+
|  0|  Bob|  123|                      123|
|  1| Jack| null|                      456|
|  2|Brian| null|                      456|
|  3| John|  456|                      456|
|  4|Edgar| null|                     null|
+---+-----+-----+-------------------------+

Question: is it possible to get another value from the row we got using .first(...)? I would like to get the name of the person that maps to the first not null value after that record:

+---+-----+-----+-------------------------+-------------------------+
| id| name|value|firstNonNullValueAfterRow|nameOfThatPerson         |
+---+-----+-----+-------------------------+-------------------------+
|  0|  Bob|  123|                      123|                      Bob|
|  1| Jack| null|                      456|                     John|
|  2|Brian| null|                      456|                     John|
|  3| John|  456|                      456|                     John|
|  4|Edgar| null|                     null|                     null|
+---+-----+-----+-------------------------+-------------------------+

This is possible with some tricks, but I would like to know if there is a way to do that with the Spark window functions. Workaround:

val idAndNameDF = df
  .select("id", "name")
  .withColumnRenamed("id", "id2")
  .withColumnRenamed("name", "nameOfThatPerson")

df
  .withColumn("idOfFirstNotNullValue", when(col("value").isNotNull, col("id")))
  .withColumn("firstNonNullIdAfterRow", first("idOfFirstNotNullValue", true)
    .over(window.rowsBetween(Window.currentRow, Window.unboundedFollowing)))
  .join(idAndNameDF, col("firstNonNullIdAfterRow") === col("id2"),"left")
  .show()

Workaround result:

+---+-----+-----+---------------------+----------------------+----+----------------+
| id| name|value|idOfFirstNotNullValue|firstNonNullIdAfterRow| id2|nameOfThatPerson|
+---+-----+-----+---------------------+----------------------+----+----------------+
|  0|  Bob|  123|                    0|                     0|   0|             Bob|
|  1| Jack| null|                 null|                     3|   3|            John|
|  2|Brian| null|                 null|                     3|   3|            John|
|  3| John|  456|                    3|                     3|   3|            John|
|  4|Edgar| null|                 null|                  null|null|            null|
+---+-----+-----+---------------------+----------------------+----+----------------+

Solution

  • Yes and No. No it is not, if you mean that it should be part of the same Window clause or overall clause. Yes, if you do something extra.

    That is to say, your work-around is correct.

    They are two different aspects:

    • Find the first 'future' non-null occurrence
    • And then find related data for that occurrence.

    It sort of stands to reason. You should see it as a sub-query situation.