Search code examples
scalaapache-sparkwindow-functions

Spark: get rows value from a Window


With spark I defined a Window:

val window = Window
  .partitionBy("myaggcol")
  .orderBy("datefield")
  .rowsBetween(-2, 0)

Then I can compute a new column from the window' rows, eg:

dataset
  .withColumn("newcol", last("diffcol").over(window) - first("diffcol").over(window))

This will compute, for each point, the difference in "diffcol" with the n-2 row.

Now my question: how can I get the "diffcol" of n-1 row, not the first nor the last but the intermediary one?


Solution

  • If I understand your question correctly, Window function lag would work better than rowsBetween, as shown in the following example:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    import spark.implicits._
    
    val df = Seq(
      ("a", 1, 100), ("a", 2, 200), ("a", 3, 300), ("a", 4, 400),
      ("b", 1, 500), ("b", 2, 600), ("b", 3, 700)
    ).toDF("c1", "c2", "c3")
    
    val win = Window.partitionBy("c1").orderBy("c2")
    
    df.
      withColumn("c3Diff1", $"c3" - coalesce(lag("c3", 1).over(win), lit(0))).
      withColumn("c3Diff2", $"c3" - coalesce(lag("c3", 2).over(win), lit(0))).
      show
    // +---+---+---+-------+-------+
    // | c1| c2| c3|c3Diff1|c3Diff2|
    // +---+---+---+-------+-------+
    // |  b|  1|500|    500|    500|
    // |  b|  2|600|    100|    600|
    // |  b|  3|700|    100|    200|
    // |  a|  1|100|    100|    100|
    // |  a|  2|200|    100|    200|
    // |  a|  3|300|    100|    200|
    // |  a|  4|400|    100|    200|
    // +---+---+---+-------+-------+