I have time series data in CSV from vehicle with following information:
The data looks like this:
trip-id | timestamp | speed
001 | 1538204192 | 44.55
001 | 1538204193 | 47.20 <-- start of brake
001 | 1538204194 | 42.14
001 | 1538204195 | 39.20
001 | 1538204196 | 35.30
001 | 1538204197 | 32.22 <-- end of brake
001 | 1538204198 | 34.80
001 | 1538204199 | 37.10
...
001 | 1538204221 | 55.30
001 | 1538204222 | 57.20 <-- start of brake
001 | 1538204223 | 54.60
001 | 1538204224 | 52.15
001 | 1538204225 | 49.27
001 | 1538204226 | 47.89 <-- end of brake
001 | 1538204227 | 50.57
001 | 1538204228 | 53.72
...
A braking event occurs when there's a decrease in speed
in 2 consecutive records based on timestamp
.
I want to extract the braking events from the data in terms of event start timestamp
, end timestamp
, start speed
& end speed
.
+-------------+---------------+-------------+-----------+---------+
| breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193| 1538204193| 1538204196| 47.2| 35.3|
|0011538204222| 1538204222| 1538204225| 57.2| 49.27|
+-------------+---------------+-------------+-----------+---------+
Here's my take:
trip-id
, ordered by timestamp
.lag
to move over consecutive rows and calculate speed difference.I am stuck here as i do not have a key
belonging to same group so i can apply key based aggregation.
My question is:
How can I map to add a key
column based on the difference in timestamp? So if 2 records have a difference of 1 seconds, they should have a common key. That way, I can reduce a group based on the newly added key.
Is there any better & more optimized way to achieve this? My approach could be very inefficient as it relies on row by row comparisons. What are the other possible ways to detect these kind of "sub-events" (e.g braking events) in a data-stream belonging to a specific event (data from single vehicle trip)?
Thanks in advance!
Appendix:
Hope this helps. Scala code.
Output
+-------------+---------------+-------------+-----------+---------+
| breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193| 1538204193| 1538204196| 47.2| 35.3|
|0011538204222| 1538204222| 1538204225| 57.2| 49.27|
+-------------+---------------+-------------+-----------+---------+
CODE
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions._
scala> df.show
+-------+----------+-----+
|trip-id| timestamp|speed|
+-------+----------+-----+
| 001|1538204192|44.55|
| 001|1538204193| 47.2|
| 001|1538204194|42.14|
| 001|1538204195| 39.2|
| 001|1538204196| 35.3|
| 001|1538204197|32.22|
| 001|1538204198| 34.8|
| 001|1538204199| 37.1|
| 001|1538204221| 55.3|
| 001|1538204222| 57.2|
| 001|1538204223| 54.6|
| 001|1538204224|52.15|
| 001|1538204225|49.27|
| 001|1538204226|47.89|
| 001|1538204227|50.57|
| 001|1538204228|53.72|
+-------+----------+-----+
val overColumns = Window.partitionBy("trip-id").orderBy("timestamp")
val breaksDF = df
.withColumn("speeddiff", lead("speed", 1).over(overColumns) - $"speed")
.withColumn("breaking", when($"speeddiff" < 0, 1).otherwise(0))
scala> breaksDF.show
+-------+----------+-----+-------------------+--------+
|trip-id| timestamp|speed| speeddiff|breaking|
+-------+----------+-----+-------------------+--------+
| 001|1538204192|44.55| 2.6500000000000057| 0|
| 001|1538204193| 47.2| -5.060000000000002| 1|
| 001|1538204194|42.14|-2.9399999999999977| 1|
| 001|1538204195| 39.2|-3.9000000000000057| 1|
| 001|1538204196| 35.3|-3.0799999999999983| 1|
| 001|1538204197|32.22| 2.5799999999999983| 0|
| 001|1538204198| 34.8| 2.3000000000000043| 0|
| 001|1538204199| 37.1| 18.199999999999996| 0|
| 001|1538204221| 55.3| 1.9000000000000057| 0|
| 001|1538204222| 57.2|-2.6000000000000014| 1|
| 001|1538204223| 54.6| -2.450000000000003| 1|
| 001|1538204224|52.15|-2.8799999999999955| 1|
| 001|1538204225|49.27|-1.3800000000000026| 1|
| 001|1538204226|47.89| 2.6799999999999997| 0|
| 001|1538204227|50.57| 3.1499999999999986| 0|
| 001|1538204228|53.72| null| 0|
+-------+----------+-----+-------------------+--------+
val outputDF = breaksDF
.withColumn("breakevent",
when(($"breaking" - lag($"breaking", 1).over(overColumns)) === 1, "start of break")
.when(($"breaking" - lead($"breaking", 1).over(overColumns)) === 1, "end of break"))
scala> outputDF.show
+-------+----------+-----+-------------------+--------+--------------+
|trip-id| timestamp|speed| speeddiff|breaking| breakevent|
+-------+----------+-----+-------------------+--------+--------------+
| 001|1538204192|44.55| 2.6500000000000057| 0| null|
| 001|1538204193| 47.2| -5.060000000000002| 1|start of break|
| 001|1538204194|42.14|-2.9399999999999977| 1| null|
| 001|1538204195| 39.2|-3.9000000000000057| 1| null|
| 001|1538204196| 35.3|-3.0799999999999983| 1| end of break|
| 001|1538204197|32.22| 2.5799999999999983| 0| null|
| 001|1538204198| 34.8| 2.3000000000000043| 0| null|
| 001|1538204199| 37.1| 18.199999999999996| 0| null|
| 001|1538204221| 55.3| 1.9000000000000057| 0| null|
| 001|1538204222| 57.2|-2.6000000000000014| 1|start of break|
| 001|1538204223| 54.6| -2.450000000000003| 1| null|
| 001|1538204224|52.15|-2.8799999999999955| 1| null|
| 001|1538204225|49.27|-1.3800000000000026| 1| end of break|
| 001|1538204226|47.89| 2.6799999999999997| 0| null|
| 001|1538204227|50.57| 3.1499999999999986| 0| null|
| 001|1538204228|53.72| null| 0| null|
+-------+----------+-----+-------------------+--------+--------------+
scala> outputDF.filter("breakevent is not null").select("trip-id", "timestamp", "speed", "breakevent").show
+-------+----------+-----+--------------+
|trip-id| timestamp|speed| breakevent|
+-------+----------+-----+--------------+
| 001|1538204193| 47.2|start of break|
| 001|1538204196| 35.3| end of break|
| 001|1538204222| 57.2|start of break|
| 001|1538204225|49.27| end of break|
+-------+----------+-----+--------------+
outputDF.filter("breakevent is not null").withColumn("breakID",
when($"breakevent" === "start of break", concat($"trip-id",$"timestamp"))
.when($"breakevent" === "end of break", concat($"trip-id", lag($"timestamp", 1).over(overColumns))))
.groupBy("breakID").agg(first($"timestamp") as "start timestamp", last($"timestamp") as "end timestamp", first($"speed") as "start speed", last($"speed") as "end speed").show
+-------------+---------------+-------------+-----------+---------+
| breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193| 1538204193| 1538204196| 47.2| 35.3|
|0011538204222| 1538204222| 1538204225| 57.2| 49.27|
+-------------+---------------+-------------+-----------+---------+