Search code examples
apache-sparkpysparkapache-spark-sql

How to get count of rows occurring each hour and day of week using Spark dataframe?


I would like to return the count of times that an event occurs each hour of the week (grouped by day and hour). Using a Spark DataFrame I can get a list of returned rows with a 'dateOccurred' string column. (24th and 17th are Fridays, and 23rd and 16th are Thursdays)

  • Row(dateOccurred='24-04-2020 10:08:00')

  • Row(dateOccurred='24-04-2020 11:52:00')

  • Row(dateOccurred='24-04-2020 11:35:00')

  • Row(dateOccurred='23-04-2020 15:13:00')

  • Row(dateOccurred='23-04-2020 15:20:00')

  • Row(dateOccurred='23-04-2020 23:52:00')

  • Row(dateOccurred='16-04-2020 15:22:00')

  • Row(dateOccurred='16-04-2020 23:12:00')

  • Row(dateOccurred='16-04-2020 14:28:00')

  • Row(dateOccurred='17-04-2020 10:16:00')

  • Row(dateOccurred='17-04-2020 11:19:00')

  • Row(dateOccurred='17-04-2020 12:52:00')

I would like to transform the result into the following format"

  • ('Friday_10', 2)
  • ('Friday_11', 3)
  • ('Friday_12', 1)
  • ('Thursday_15', 3)
  • ('Thursday_23', 2)
  • ('Thursday_14', 1)

Solution

  • You have to extract the day and hour from your date column. For the day, you have two options. First, you can use UDF to extract the Weekday full name. If the day is enough as an integer you can use the built-in PySpark functionality. After that, you can concatenate the two columns and do a groupBy + count

    import datetime
    import pyspark.sql.functions as f
    from pyspark.sql.types import StringType
    
    def  get_day_from_date(dt)
        dt_parsed = datetime.datetime.strptime(dt_2, '%d-%m-%Y %H:%M:%S')
        ans = datetime.date(dt_parsed.year, dt_parsed.month, dt_parsed.day)
        return ans.strftime("%A")
    
    to_day = f.udf(get_day_from_date, StringType())
    df = df.withColumn('dateOccurred_ts', f.to_timestamp('Timestamp', 'dd-MM-yyyy HH:mm:ss'))
    
    # udf returns Weekday as locale’s full name
    df = df.withColumn('day', to_day(f.col('dateOccurred')))
    ## other solution only returns day of the week of a given date as integer.
    df = df.withColumn('day', f.dayofweek('dateOccurred_ts'))
    
    df = df.withColumn('hour', f.hour('dateOccured_ts'))
    df_2 = df.select(f.concat_ws('_', f.col('day'), f.col('hour')))
    df = df.groupBy('day_hour').agg(f.count(f.lit(1)).alias('dateOccurred_cnt'))