Search code examples
apache-sparkwindow-functions

Change Data Capture using Apache Spark


What will be the best way to solve the problem using Apache Spark?

My dataset is as follows-

ID, DATE,       TIME, VALUE
001,2019-01-01, 0010, 150
001,2019-01-01, 0020, 150
001,2019-01-01, 0030, 160
001,2019-01-01, 0040, 160
001,2019-01-01, 0050, 150
002,2019-01-01, 0010, 151
002,2019-01-01, 0020, 151
002,2019-01-01, 0030, 161
002,2019-01-01, 0040, 162
002,2019-01-01, 0051, 152

I need to keep the rows when the 'VALUE' was changed for each ID.

My expected output-

ID, DATE,       TIME, VALUE
001,2019-01-01, 0010, 150
001,2019-01-01, 0030, 160
001,2019-01-01, 0050, 150
002,2019-01-01, 0010, 151
002,2019-01-01, 0030, 161
002,2019-01-01, 0040, 162
002,2019-01-01, 0051, 152

Solution

  • You can use lag function with Window :

    val df = Seq(
      ("001", "2019-01-01", "0010", "150"),
      ("001", "2019-01-01", "0020", "150"),
      ("001", "2019-01-01", "0030", "160"),
      ("001", "2019-01-01", "0040", "160"),
      ("001", "2019-01-01", "0050", "150"),
      ("002", "2019-01-01", "0010", "151"),
      ("002", "2019-01-01", "0020", "151"),
      ("002", "2019-01-01", "0030", "161"),
      ("002", "2019-01-01", "0040", "162"),
      ("002", "2019-01-01", "0051", "152")
    ).toDF("ID", "DATE", "TIME", "VALUE")
    
    
    df
      .withColumn("change",coalesce($"VALUE"=!=lag($"VALUE",1).over(Window.partitionBy($"ID").orderBy($"TIME")),lit(true)))
      .where($"change")
      //.drop($"change")
      .show()
    

    gives:

    +---+----------+----+-----+------+
    | ID|      DATE|TIME|VALUE|change|
    +---+----------+----+-----+------+
    |001|2019-01-01|0010|  150|  true|
    |001|2019-01-01|0030|  160|  true|
    |001|2019-01-01|0050|  150|  true|
    |002|2019-01-01|0010|  151|  true|
    |002|2019-01-01|0030|  161|  true|
    |002|2019-01-01|0040|  162|  true|
    |002|2019-01-01|0051|  152|  true|
    +---+----------+----+-----+------+