Search code examples
apache-sparkapache-spark-sqlwindow-functions

Group by key and find the previous timestamp of an event that occured in a specific time window efficiently with Spark/Scala


Note: My grouping can contain up to 5-10K rows per group for the aggregation. So an efficient code is highly desirable.

My Data

val df1 = sc.parallelize(Seq(
  ("user2", "iphone", "2017-12-23 16:58:08", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:12", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:20", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:25", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:35", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-23 16:58:08|Success|
|   user2|iphone|2017-12-23 16:58:12|Success|
|   user2|iphone|2017-12-23 16:58:20|Success|
|   user2|iphone|2017-12-23 16:58:25|Success|
|   user2|iphone|2017-12-23 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

What I want
A grouping by (username,device) for the latest time an event occurred.

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

Exceptions in desired output:
Now since I mentioned it has to be in a specific time window for example in the input dataset below where the last row has the latest date timestamp of 23rd December. Now If I want a specific time window of going back 1 day and give me the last attempt, the 'previous_attempt_at' column will be null, since there are no events the previous day which should be in 22nd January. It all depends on the input timestamp range.

//Initial Data
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-20 16:58:08|Success|
|   user2|iphone|2017-12-20 16:58:12|Success|
|   user2|iphone|2017-12-20 16:58:20|Success|
|   user2|iphone|2017-12-20 16:58:25|Success|
|   user2|iphone|2017-12-20 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

// Desired Output
A grouping by (username,device) for the latest time an event occurred.

    +--------+------+-------------------+-------+-------------------+
    |username|device|         attempt_at|   stat|previous_attempt_at|
    +--------+------+-------------------+-------+-------------------+
    |   user2|iphone|2017-12-23 16:58:45|Success|               null|
    +--------+------+-------------------+-------+-------------------+

What I Have.

val w = (Window.partitionBy("username", "device")
                 .orderBy(col("attempt_at").cast("timestamp").cast("long"))
                   .rangeBetween(-3600, -1)
                 )

val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:08|Success|               null|
|   user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
|   user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
|   user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
|   user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

Notes. The code I have does windowing for every row in the specific user grouping. Which is highly inefficient while working with large scale of data also doesn't give the latest attempt. I don't need all the rows except for the last one.


Solution

  • All you need is an additional groupBy and aggregation but before that you would need collect_list function for cumulative collection of previous dates and udf function to check for the previous attempt_at is within the time limit and to convert the three columns ("attempt_at", "stat", "previous_attempt_at") as struct for selecting the last one as

    import org.apache.spark.sql.functions._
    import java.time._
    import java.time.temporal._
    import java.time.format._
    def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> {
      val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
      val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
      val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
      if(diffDates.size > 0) diffDates.last else null
    })
    
    import org.apache.spark.sql.expressions._
    val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))
    
    val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
      .withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
      .groupBy("username", "device").agg(max("struct").as("struct"))
      .select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))
    

    This should give you for the later example

    +--------+------+---------------------+-------+-------------------+
    |username|device|attempt_at           |stat   |previous_attempt_at|
    +--------+------+---------------------+-------+-------------------+
    |user2   |iphone|2017-12-23 16:58:45.0|Success|null               |
    +--------+------+---------------------+-------+-------------------+
    

    and the following for the previous input data

    +--------+------+---------------------+-------+-------------------+
    |username|device|attempt_at           |stat   |previous_attempt_at|
    +--------+------+---------------------+-------+-------------------+
    |user2   |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
    +--------+------+---------------------+-------+-------------------+
    

    and you can change the logic for hours by changing the ChronoUnit.DAYS in udf function to ChronoUnit.HOURS and so on