Search code examples
scalaapache-sparkapache-spark-sqluser-defined-functionsrolling-average

How to get Hourly average of Occurrences of event using TimeStamp/Date List in Scala


Sample Data for Timestamp

2018-04-07 07:07:17
2018-04-07 07:32:27

2018-04-07 08:36:44
2018-04-07 08:38:00
2018-04-07 08:39:29

2018-04-08 01:43:08
2018-04-08 01:43:55

2018-04-09 07:52:31
2018-04-09 07:52:42

2019-01-24 11:52:31

2019-01-24 12:52:42
2019-01-25 12:52:42

Expected Output: (2+3+2+2+1+3)/6 = 1.66

I have to do it for weekly and monthly as well, but I can infer from the Hourly logic.

 // Iterate over each entry of a group
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    val dateString = input(0).toString()

    val dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S")
    val zdt = ZonedDateTime.parse(dateString, dtf.withZone(ZoneId.systemDefault)

    // zdt ZonedDateTime

I can get use all the method of ZonedDateTime


Solution

  • I tried to solve using below approach-

    Please note that the code is running in IST (GMT + 5.30), Therefore the dates 2018-04-07 07:07:17 and 2018-04-07 07:32:27 will be considered in different hours (1st in 6:30 - 7:30 and 2nd in 7:30 - 8:30)

    Code

    1. Read the data
    val spark = sqlContext.sparkSession
        val implicits = spark.implicits
        import implicits._
        val data =
          """
            |2018-04-07 07:07:17
            |2018-04-07 07:32:27
            |2018-04-07 08:36:44
            |2018-04-07 08:38:00
            |2018-04-07 08:39:29
            |2018-04-08 01:43:08
            |2018-04-08 01:43:55
            |2018-04-09 07:52:31
            |2018-04-09 07:52:42
            |2019-01-24 11:52:31
            |2019-01-24 12:52:42
            |2019-01-25 12:52:42
          """.stripMargin
        val df = spark.read
          .schema(StructType(Array(StructField("date_time", DataTypes.TimestampType))))
          .csv(data.split(System.lineSeparator()).toSeq.toDS())
        df.show(false)
        df.printSchema()
    

    Result -

    +-------------------+
    |date_time          |
    +-------------------+
    |2018-04-07 07:07:17|
    |2018-04-07 07:32:27|
    |2018-04-07 08:36:44|
    |2018-04-07 08:38:00|
    |2018-04-07 08:39:29|
    |2018-04-08 01:43:08|
    |2018-04-08 01:43:55|
    |2018-04-09 07:52:31|
    |2018-04-09 07:52:42|
    |2019-01-24 11:52:31|
    |2019-01-24 12:52:42|
    |2019-01-25 12:52:42|
    +-------------------+
    root
     |-- date_time: timestamp (nullable = true)
    
    1. Bucketize the data and find the count for each hour
     val hour = 60 * 60
        // convert the time into unix epoch
        val processedDF = df.withColumn("unix_epoch", unix_timestamp(col("date_time")))
          .withColumn("hour_bucket", floor(col("unix_epoch")/hour))
          .groupBy("hour_bucket")
          .count()
    
        processedDF.show(false)
    

    Result-

    +-----------+-----+
    |hour_bucket|count|
    +-----------+-----+
    |423073     |1    |
    |423074     |1    |
    |423075     |3    |
    |423092     |2    |
    |423122     |2    |
    |430087     |1    |
    |430086     |1    |
    |430111     |1    |
    +-----------+-----+
    
    1. find hourly average
      // average count
        processedDF.agg(avg("count")).show(false)
    

    Result-

    +----------+
    |avg(count)|
    +----------+
    |1.5       |
    +----------+
    

    Hope this helps!