There is a DataFrame of data
like
|timestamp |value|
|2021-01-01 12:00:00| 10.0|
|2021-01-01 12:00:01| 10.0|
|2021-01-01 12:00:02| 10.0|
|2021-01-01 12:00:03| 10.0|
|2021-01-01 12:00:04| 10.0|
|2021-01-01 12:00:05| 10.0|
|2021-01-01 12:00:06| 10.0|
|2021-01-01 12:00:07| 10.0|
and DataFrame of events
like
|timestamp |event|
|2021-01-01 12:00:01| true|
|2021-01-01 12:00:05| true|
based on that I'd like to add one more column to the initial DataFrame that is an index
of the data since beginning of the event
:
|timestamp |value|index|
|2021-01-01 12:00:00| 10.0| 1|
|2021-01-01 12:00:01| 10.0| 2|
|2021-01-01 12:00:02| 10.0| 3|
|2021-01-01 12:00:03| 10.0| 4|
|2021-01-01 12:00:04| 10.0| 5|
|2021-01-01 12:00:05| 10.0| 1|
|2021-01-01 12:00:06| 10.0| 2|
|2021-01-01 12:00:07| 10.0| 3|
I have tried with
.withColumn("index",monotonically_increasing_id())
but there is no way to set it back to 0 at joining it with some other DataFrame. So, any ideas are welcome.
You can join data
df with event
df on timestamp
then use a conditional cumulative sum on event
column to define groups. Finally, partition by the group
column to set row number.
Something like this:
import org.apache.spark.sql.expressions.Window
val result = data.join(
events,
Seq("timestamp"),
"left"
).withColumn(
"group",
sum(when(col("event"), 1).otherwise(0)).over(Window.orderBy("timestamp"))
).withColumn(
"index",
row_number().over(Window.partitionBy("group").orderBy("timestamp"))
).drop("group", "event")
result.show
//+-------------------+-----+-----+
//| timestamp|value|index|
//+-------------------+-----+-----+
//|2021-01-01 12:00:00| 10.0| 1|
//|2021-01-01 12:00:01| 10.0| 1|
//|2021-01-01 12:00:02| 10.0| 2|
//|2021-01-01 12:00:03| 10.0| 3|
//|2021-01-01 12:00:04| 10.0| 4|
//|2021-01-01 12:00:05| 10.0| 1|
//|2021-01-01 12:00:06| 10.0| 2|
//|2021-01-01 12:00:07| 10.0| 3|
//+-------------------+-----+-----+