Search code examples
dataframeapache-sparkpysparkapache-spark-sqlrdd

Spark: How to aggregate/reduce records based on time difference?


I have time series data in CSV from vehicle with following information:

  • trip-id
  • timestamp
  • speed

The data looks like this:

trip-id | timestamp  | speed

001     | 1538204192 | 44.55
001     | 1538204193 | 47.20 <-- start of brake
001     | 1538204194 | 42.14
001     | 1538204195 | 39.20
001     | 1538204196 | 35.30
001     | 1538204197 | 32.22 <-- end of brake
001     | 1538204198 | 34.80
001     | 1538204199 | 37.10
...
001     | 1538204221 | 55.30
001     | 1538204222 | 57.20 <-- start of brake
001     | 1538204223 | 54.60
001     | 1538204224 | 52.15
001     | 1538204225 | 49.27
001     | 1538204226 | 47.89 <-- end of brake
001     | 1538204227 | 50.57
001     | 1538204228 | 53.72
...

A braking event occurs when there's a decrease in speed in 2 consecutive records based on timestamp.

I want to extract the braking events from the data in terms of event start timestamp, end timestamp, start speed & end speed.

+-------------+---------------+-------------+-----------+---------+
|      breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193|     1538204193|   1538204196|       47.2|     35.3|
|0011538204222|     1538204222|   1538204225|       57.2|    49.27|
+-------------+---------------+-------------+-----------+---------+

Here's my take:

  1. Defined a window spec with partition according to trip-id, ordered by timestamp.
  2. Applied window lag to move over consecutive rows and calculate speed difference.
  3. Filter out records which have positive speed difference, as i am interested in braking events only.
  4. Now that I only have records belonging to braking events, I want to group records belonging to same event. I guess i can do this based on the timestamp difference. If the difference between 2 records is 1 second, those 2 records belong to same braking event.

I am stuck here as i do not have a key belonging to same group so i can apply key based aggregation.

My question is:

  1. How can I map to add a key column based on the difference in timestamp? So if 2 records have a difference of 1 seconds, they should have a common key. That way, I can reduce a group based on the newly added key.

  2. Is there any better & more optimized way to achieve this? My approach could be very inefficient as it relies on row by row comparisons. What are the other possible ways to detect these kind of "sub-events" (e.g braking events) in a data-stream belonging to a specific event (data from single vehicle trip)?

Thanks in advance!


Appendix:


Solution

  • Hope this helps. Scala code.

    Output

    +-------------+---------------+-------------+-----------+---------+
    |      breakID|start timestamp|end timestamp|start speed|end speed|
    +-------------+---------------+-------------+-----------+---------+
    |0011538204193|     1538204193|   1538204196|       47.2|     35.3|
    |0011538204222|     1538204222|   1538204225|       57.2|    49.27|
    +-------------+---------------+-------------+-----------+---------+
    

    CODE

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.expressions.WindowSpec
    import org.apache.spark.sql.functions._
    
    scala> df.show
    +-------+----------+-----+
    |trip-id| timestamp|speed|
    +-------+----------+-----+
    |    001|1538204192|44.55|
    |    001|1538204193| 47.2|
    |    001|1538204194|42.14|
    |    001|1538204195| 39.2|
    |    001|1538204196| 35.3|
    |    001|1538204197|32.22|
    |    001|1538204198| 34.8|
    |    001|1538204199| 37.1|
    |    001|1538204221| 55.3|
    |    001|1538204222| 57.2|
    |    001|1538204223| 54.6|
    |    001|1538204224|52.15|
    |    001|1538204225|49.27|
    |    001|1538204226|47.89|
    |    001|1538204227|50.57|
    |    001|1538204228|53.72|
    +-------+----------+-----+
    
    val overColumns = Window.partitionBy("trip-id").orderBy("timestamp")
    val breaksDF = df
      .withColumn("speeddiff", lead("speed", 1).over(overColumns) - $"speed")
      .withColumn("breaking", when($"speeddiff" < 0, 1).otherwise(0))
    
    scala> breaksDF.show
    +-------+----------+-----+-------------------+--------+
    |trip-id| timestamp|speed|          speeddiff|breaking|
    +-------+----------+-----+-------------------+--------+
    |    001|1538204192|44.55| 2.6500000000000057|       0|
    |    001|1538204193| 47.2| -5.060000000000002|       1|
    |    001|1538204194|42.14|-2.9399999999999977|       1|
    |    001|1538204195| 39.2|-3.9000000000000057|       1|
    |    001|1538204196| 35.3|-3.0799999999999983|       1|
    |    001|1538204197|32.22| 2.5799999999999983|       0|
    |    001|1538204198| 34.8| 2.3000000000000043|       0|
    |    001|1538204199| 37.1| 18.199999999999996|       0|
    |    001|1538204221| 55.3| 1.9000000000000057|       0|
    |    001|1538204222| 57.2|-2.6000000000000014|       1|
    |    001|1538204223| 54.6| -2.450000000000003|       1|
    |    001|1538204224|52.15|-2.8799999999999955|       1|
    |    001|1538204225|49.27|-1.3800000000000026|       1|
    |    001|1538204226|47.89| 2.6799999999999997|       0|
    |    001|1538204227|50.57| 3.1499999999999986|       0|
    |    001|1538204228|53.72|               null|       0|
    +-------+----------+-----+-------------------+--------+
    
    
    val outputDF = breaksDF
      .withColumn("breakevent", 
        when(($"breaking" - lag($"breaking", 1).over(overColumns)) === 1, "start of break")
        .when(($"breaking" - lead($"breaking", 1).over(overColumns)) === 1, "end of break"))
    
    scala> outputDF.show
    +-------+----------+-----+-------------------+--------+--------------+
    |trip-id| timestamp|speed|          speeddiff|breaking|    breakevent|
    +-------+----------+-----+-------------------+--------+--------------+
    |    001|1538204192|44.55| 2.6500000000000057|       0|          null|
    |    001|1538204193| 47.2| -5.060000000000002|       1|start of break|
    |    001|1538204194|42.14|-2.9399999999999977|       1|          null|
    |    001|1538204195| 39.2|-3.9000000000000057|       1|          null|
    |    001|1538204196| 35.3|-3.0799999999999983|       1|  end of break|
    |    001|1538204197|32.22| 2.5799999999999983|       0|          null|
    |    001|1538204198| 34.8| 2.3000000000000043|       0|          null|
    |    001|1538204199| 37.1| 18.199999999999996|       0|          null|
    |    001|1538204221| 55.3| 1.9000000000000057|       0|          null|
    |    001|1538204222| 57.2|-2.6000000000000014|       1|start of break|
    |    001|1538204223| 54.6| -2.450000000000003|       1|          null|
    |    001|1538204224|52.15|-2.8799999999999955|       1|          null|
    |    001|1538204225|49.27|-1.3800000000000026|       1|  end of break|
    |    001|1538204226|47.89| 2.6799999999999997|       0|          null|
    |    001|1538204227|50.57| 3.1499999999999986|       0|          null|
    |    001|1538204228|53.72|               null|       0|          null|
    +-------+----------+-----+-------------------+--------+--------------+
    
    
    scala> outputDF.filter("breakevent is not null").select("trip-id", "timestamp", "speed", "breakevent").show
    +-------+----------+-----+--------------+
    |trip-id| timestamp|speed|    breakevent|
    +-------+----------+-----+--------------+
    |    001|1538204193| 47.2|start of break|
    |    001|1538204196| 35.3|  end of break|
    |    001|1538204222| 57.2|start of break|
    |    001|1538204225|49.27|  end of break|
    +-------+----------+-----+--------------+
    
    outputDF.filter("breakevent is not null").withColumn("breakID", 
      when($"breakevent" === "start of break", concat($"trip-id",$"timestamp"))
      .when($"breakevent" === "end of break", concat($"trip-id", lag($"timestamp", 1).over(overColumns))))
      .groupBy("breakID").agg(first($"timestamp") as "start timestamp", last($"timestamp") as "end timestamp", first($"speed") as "start speed", last($"speed") as "end speed").show
    
    
    +-------------+---------------+-------------+-----------+---------+
    |      breakID|start timestamp|end timestamp|start speed|end speed|
    +-------------+---------------+-------------+-----------+---------+
    |0011538204193|     1538204193|   1538204196|       47.2|     35.3|
    |0011538204222|     1538204222|   1538204225|       57.2|    49.27|
    +-------------+---------------+-------------+-----------+---------+