Search code examples
scalaapache-sparkapache-spark-sqlwindow-functions

Spark: How to capture column transition and groupby on the column


id value prevValue indicator
1 emp1 null 1
2 emp2 emp1 2
3 emp1 emp2 3
4 emp1 emp1 3
5 emp3 emp1 4
6 emp3 emp3 4
7 emp1 emp3 5
8 emp2 emp1 6
9 emp2 emp2 6
10 emp2 emp2 6

Lets say this whole set of rows is within one session window I am have created the prevValue column using lag function lag("value", 1).over(sessionWindow).as("prevValue").

I wanted to create the indicator column like displayed in the table but haven't been successful yet.

There is a transition in value when value of prev row is not same as value of current row, whenever there is a transition the indicator row is incremented by 1 otherwise remains same as the previous row.

The main reasoning behind creating indicator column is to do a group by on indicators later.

Here what I was trying to do to create the column, but this doesn't work. Any help to resolve this would be appreciated.

.withColumn("indicator", when(col("prevValue").isNull, 1).otherwise(0))
.withColumn("indicator", 
  when(col("value") =!= col("prevValue"), lag("indicator", 1).over(sessionWindow) + 1)
  .otherwise(lag("indicator", 1, 1).over(sessionWindow)))

Solution

  • val indWindow = Window.orderBy("id")
    val sumWIndow = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    df.withColumn("changed", when(col("value") =!= lag(col("value"), 1).over(indWindow), 1).otherwise(0))
      .withColumn("group", sum("changed").over(sumWIndow))
    

    produces:

    +---+-----+-------+-----+
    | id|value|changed|group|
    +---+-----+-------+-----+
    |  1|    a|      0|    0|
    |  2|    a|      0|    0|
    |  3|    b|      1|    1|
    |  3|    c|      1|    2|
    |  4|    c|      0|    2|
    |  5|    c|      0|    2|
    +---+-----+-------+-----+
    

    note: This is a very performance inefficient solution as Spark will have to put all the rows in the same partition. Do you have any column to partition the data on?