Search code examples
apache-sparkpysparkbigdataspark-streamingspark-structured-streaming

How to calculate time between events in a group


How to find the time between events in a group?

For example, I have Streaming Source (Kafka) from which I get many columns. This stream is read into spark, preprocessed, cleaned and only these four columns are kept: "ClientTimestamp" ,"sensor_type", "activity", "User_detail".

Now, I want to calculate the total time for which the critical activity existed for each user.

 Clientimestamp         Sensor_type     activity         User_detail
4/11/2021 10:00:00      ultrasonic       critical          user_A
4/11/2021 10:00:00      ultrasonic       normal            user_B            
4/11/2021 10:03:00      ultrasonic       normal            user_A
4/11/2021 10:05:00      ultrasonic       critical          user_B
4/11/2021 10:06:00      ultrasonic       critical          user_A
4/11/2021 10:07:00      ultrasonic       critical          user_A
4/11/2021 10:08:00      ultrasonic       critical          user_B
4/11/2021 10:09:00      ultrasonic       critical          user_B

so for user_A the total time between all critical activity is calculated by finding difference between two critical events and summing up such differences.

(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
 therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.

Similarly for user_B,

(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
 userB critical activity lasted for total minute of (3+1) = 4 minute

For each window, i want to call a custom function that will calculate totaltime. How to apply a function on the group grouped by window?

df = df.withWatermark("clientTimestamp", "10 minutes")\
       .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity')) 
       .apply(calculate_time)

Solution

  • It looks like this could be solved by taking the difference between the maximum and minimum time for each User_detail within the Window. Also, a filter on the activity can be applied to ignore "normal" rows.

    I do not see a reason why applying a custom function such as "calculate_time" is required here. Please note, I am not completely familiar with Python syntax, but your code could look like below:

    df = df \
      .filter(df.activity == "critical") \
      .withWatermark("clientTimestamp", "10 minutes") \
      .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
      .agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))