Search code examples
scalaapache-sparkapache-spark-sqlwindow-functions

Get the last element of a window in Spark 2.1.1


I have a dataframe in which I have subcategories, and want the last element of each of these subcategories.

val windowSpec = Window.partitionBy("name").orderBy("count")
sqlContext
    .createDataFrame(
      Seq[(String, Int)](
        ("A", 1),
        ("A", 2),
        ("A", 3),
        ("B", 10),
        ("B", 20),
        ("B", 30)
      ))
    .toDF("name", "count")
    .withColumn("firstCountOfName", first("count").over(windowSpec))
    .withColumn("lastCountOfName", last("count").over(windowSpec))
    .show()

returns me something strange:

+----+-----+----------------+---------------+                                   
|name|count|firstCountOfName|lastCountOfName|
+----+-----+----------------+---------------+
|   B|   10|              10|             10|
|   B|   20|              10|             20|
|   B|   30|              10|             30|
|   A|    1|               1|              1|
|   A|    2|               1|              2|
|   A|    3|               1|              3|
+----+-----+----------------+---------------+

As we can see, the first value returned is correctly computed, but the last isn't, it's always the current value of the column.

Has someone a solution to do what I want?


Solution

  • According to the issue SPARK-20969, you should be able to get the expected results by defining adequate bounds to your window, as shown below.

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    
    val windowSpec = Window
      .partitionBy("name")
      .orderBy("count")
      .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    sqlContext
      .createDataFrame(
        Seq[(String, Int)](
          ("A", 1),
          ("A", 2),
          ("A", 3),
          ("B", 10),
          ("B", 20),
          ("B", 30)
        ))
      .toDF("name", "count")
      .withColumn("firstCountOfName", first("count").over(windowSpec))
      .withColumn("lastCountOfName", last("count").over(windowSpec))
      .show()
    

    Alternatively, if your are ordering on the same column you are computing first and last, you can change for min and max with a non-ordered window, then it should also work properly.