Search code examples
scalaapache-sparkapache-spark-sqlapache-zeppelin

How to assign non unique incrementing index (index markup) in Spark SQL, set back to 0 on joining the specific value from another dataframe


There is a DataFrame of data like

|timestamp          |value|
|2021-01-01 12:00:00| 10.0|
|2021-01-01 12:00:01| 10.0|
|2021-01-01 12:00:02| 10.0|
|2021-01-01 12:00:03| 10.0|
|2021-01-01 12:00:04| 10.0|
|2021-01-01 12:00:05| 10.0|
|2021-01-01 12:00:06| 10.0|
|2021-01-01 12:00:07| 10.0|

and DataFrame of events like

|timestamp          |event|
|2021-01-01 12:00:01| true|
|2021-01-01 12:00:05| true|

based on that I'd like to add one more column to the initial DataFrame that is an index of the data since beginning of the event:

|timestamp          |value|index|
|2021-01-01 12:00:00| 10.0|    1|
|2021-01-01 12:00:01| 10.0|    2|
|2021-01-01 12:00:02| 10.0|    3|
|2021-01-01 12:00:03| 10.0|    4|
|2021-01-01 12:00:04| 10.0|    5|
|2021-01-01 12:00:05| 10.0|    1|
|2021-01-01 12:00:06| 10.0|    2|
|2021-01-01 12:00:07| 10.0|    3|

I have tried with

.withColumn("index",monotonically_increasing_id())

but there is no way to set it back to 0 at joining it with some other DataFrame. So, any ideas are welcome.


Solution

  • You can join data df with event df on timestamp then use a conditional cumulative sum on event column to define groups. Finally, partition by the group column to set row number.

    Something like this:

    import org.apache.spark.sql.expressions.Window
    
    val result = data.join(
        events, 
        Seq("timestamp"), 
        "left"
    ).withColumn(
        "group",
        sum(when(col("event"), 1).otherwise(0)).over(Window.orderBy("timestamp"))
    ).withColumn(
        "index",
        row_number().over(Window.partitionBy("group").orderBy("timestamp"))
    ).drop("group", "event")
    
    result.show
    //+-------------------+-----+-----+
    //|          timestamp|value|index|
    //+-------------------+-----+-----+
    //|2021-01-01 12:00:00| 10.0|    1|
    //|2021-01-01 12:00:01| 10.0|    1|
    //|2021-01-01 12:00:02| 10.0|    2|
    //|2021-01-01 12:00:03| 10.0|    3|
    //|2021-01-01 12:00:04| 10.0|    4|
    //|2021-01-01 12:00:05| 10.0|    1|
    //|2021-01-01 12:00:06| 10.0|    2|
    //|2021-01-01 12:00:07| 10.0|    3|
    //+-------------------+-----+-----+