Search code examples
scalaapache-sparkdataframeapache-spark-sqlwindow-functions

End-dating records using window functions in Spark SQL


I have a dataframe like below

+----+----+----------+----------+
|colA|colB|      colC|      colD|
+----+----+----------+----------+
|   a|   2|2013-12-12|2999-12-31|
|   b|   3|2011-12-14|2999-12-31|
|   a|   4|2013-12-17|2999-12-31|
|   b|   8|2011-12-19|2999-12-31|
|   a|   6|2013-12-23|2999-12-31|
+----+----+----------+----------+

I need to group the records based on ColA and rank the records based on colC(most recent date gets bigger rank) and then update the dates in colD by subtracting a day from the colC record of the adjacent rank.

The final dataframe should like below

+----+----+----------+----------+
|colA|colB|      colC|      colD|
+----+----+----------+----------+
|   a|   2|2013-12-12|2013-12-16|
|   a|   4|2013-12-17|2013-12-22|
|   a|   6|2013-12-23|2999-12-31|
|   b|   3|2011-12-14|2011-12-18|
|   b|   8|2011-12-29|2999-12-31|
+----+----+----------+----------+

Solution

  • You can get it using the window functions

    scala> val df = Seq(("a",2,"2013-12-12","2999-12-31"),("b",3,"2011-12-14","2999-12-31"),("a",4,"2013-12-17","2999-12-31"),("b",8,"2011-12-19","2999-12-31"),("a",6,"2013-12-23","2999-12-31")).toDF("colA","colB","colC","colD")
    df: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]
    
    scala> val df2 = df.withColumn("colc",'colc.cast("date")).withColumn("cold",'cold.cast("date"))
    df2: org.apache.spark.sql.DataFrame = [colA: string, colB: int ... 2 more fields]
    
    scala> df2.createOrReplaceTempView("yash")
    
    scala> spark.sql(""" select cola,colb,colc,cold, rank() over(partition by cola order by colc) c1, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold2 from yash """).show
    +----+----+----------+----------+---+----------+
    |cola|colb|      colc|      cold| c1|     cold2|
    +----+----+----------+----------+---+----------+
    |   b|   3|2011-12-14|2999-12-31|  1|2011-12-18|
    |   b|   8|2011-12-19|2999-12-31|  2|2999-12-31|
    |   a|   2|2013-12-12|2999-12-31|  1|2013-12-16|
    |   a|   4|2013-12-17|2999-12-31|  2|2013-12-22|
    |   a|   6|2013-12-23|2999-12-31|  3|2999-12-31|
    +----+----+----------+----------+---+----------+
    
    
    scala> 
    

    Removing the unnecessary columns

    scala> spark.sql(""" select cola,colb,colc, coalesce(date_sub(lead(colc) over(partition by cola order by colc),1),cold) as cold from yash """).show
    +----+----+----------+----------+
    |cola|colb|      colc|      cold|
    +----+----+----------+----------+
    |   b|   3|2011-12-14|2011-12-18|
    |   b|   8|2011-12-19|2999-12-31|
    |   a|   2|2013-12-12|2013-12-16|
    |   a|   4|2013-12-17|2013-12-22|
    |   a|   6|2013-12-23|2999-12-31|
    +----+----+----------+----------+
    
    
    scala>