Search code examples
apache-sparkpysparkapache-spark-sqlaggregatewindow-functions

How to get aggregate by hour including missing hours and add cumulative sum?


Suppose I have a Spark Dataframe below:

GroupId Event_time Event_name Event_value
xx 2011-08-15 14:47:02.617023 eventA 1
xx 2011-08-15 14:48:02.507053 eventA 2
xx 2011-08-15 16:47:02.512016 eventA 100
yy 2011-08-15 11:47:02.337019 eventA 2
yy 2011-08-15 12:47:02.617041 eventA 1
yy 2011-08-15 13:47:02.927040 eventA 3

I would like to get the rolling count of eventA value per hour with a day based on the GroupId

For example, for GroupId xx, datetime 2011-08-15 14:00, trying to calculate the count of eventA (Event_value ) for that GroupId from 14:00 till 15:00. In this case, the count should be 1 + 2 = 3.

The expected output would be something like: (basically display from 00 to 23 within a day; I have ignored some of the hours below for saving space purpose).

If there is no eventA, then we treat the count as NA (treat as 0 for calculating purpose later on) for that hour range.

For event_date 2011-08-15, there is no event until hour 14, then there is no more event after hour 16.

GroupId Date Hour Count agg_count
xx 2011-08-15 00 NA 0
xx 2011-08-15 01 NA 0
xx 2011-08-15 02 NA 0
xx 2011-08-15 13 NA 0
xx 2011-08-15 14 3 3
xx 2011-08-15 15 NA 3
xx 2011-08-15 16 100 103
xx 2011-08-15 17 NA 103
xx 2011-08-15 23 NA 103

Below is some of the codes that I have tried:

from pyspark.sql.functions import col, count, hour, sum
    
df2 = (df
  .withColumn("Event_time", col("Event_time").cast("timestamp"))
  .withColumn("Date", col("Event_time").cast("date"))
  .withColumn("Hour", hour(col("Event_time"))))

df3 = df2.groupBy("GroupId", "Date", "Hour").count()

df3.withColumn(
  "agg_count", 
  sum("Count").over(Window.partitionBy("GroupId", "Date").orderBy("Hour")))

However, the above code cannot display each hour within a day.


Solution

  • You could do it by first creating a table with hours and then joining it with the rest of data.

    Setup:

    from pyspark.sql import functions as F, Window as W
    df = spark.createDataFrame(
        [('xx', '2011-08-15 14:47:02.617023', 'eventA', 1),
         ('xx', '2011-08-15 14:48:02.507053', 'eventA', 2),
         ('xx', '2011-08-15 16:47:02.512016', 'eventA', 100),
         ('yy', '2011-08-15 11:47:02.337019', 'eventA', 2),
         ('yy', '2011-08-15 12:47:02.617041', 'eventA', 1),
         ('yy', '2011-08-15 13:47:02.927040', 'eventA', 3)],
        ['GroupId', 'Event_time', 'Event_name', 'Event_value']
    )
    df = df.withColumn('Date', F.col('Event_time').cast('date'))
    

    The following creates a dataframe with hours:

    min_date = df.agg(F.min('Date')).head()[0]
    max_date = df.agg(F.max('Date')).head()[0]
    df_hours = df.select(
        'GroupId',
        'Event_name',
        F.explode(F.expr(f"sequence(to_timestamp('{min_date} 00:00:00'), to_timestamp('{max_date} 23:00:00'), interval 1 hour)")).alias('date_hour')
    ).distinct()
    

    Then, aggregating your first table by hours:

    df_agg = (df
        .groupBy('GroupId', 'Event_name', F.date_trunc('hour', 'Event_time').alias('date_hour'))
        .agg(F.sum('Event_value').alias('Count'))
    )
    

    Joining them both together:

    df_joined = df_hours.join(df_agg, ['GroupId', 'Event_name', 'date_hour'], 'left')
    

    Adding column agg_count and others:

    w = W.partitionBy('GroupId', 'Event_name').orderBy('date_hour')
    df2 = (df_joined
        .select(
            'GroupId',
            'Event_name',
            F.to_date('date_hour').alias('Date'),
            F.date_format('date_hour', 'HH').alias('Hour'),
            'Count',
            F.coalesce(F.sum('Count').over(w), F.lit(0)).alias('agg_count')
        )
    )
    

    Result:

    +-------+----------+----------+----+-----+---------+
    |GroupId|Event_name|      Date|Hour|Count|agg_count|
    +-------+----------+----------+----+-----+---------+
    |     xx|    eventA|2011-08-15|  00| null|        0|
    |     xx|    eventA|2011-08-15|  01| null|        0|
    |     xx|    eventA|2011-08-15|  02| null|        0|
    |     xx|    eventA|2011-08-15|  03| null|        0|
    |     xx|    eventA|2011-08-15|  04| null|        0|
    |     xx|    eventA|2011-08-15|  05| null|        0|
    |     xx|    eventA|2011-08-15|  06| null|        0|
    |     xx|    eventA|2011-08-15|  07| null|        0|
    |     xx|    eventA|2011-08-15|  08| null|        0|
    |     xx|    eventA|2011-08-15|  09| null|        0|
    |     xx|    eventA|2011-08-15|  10| null|        0|
    |     xx|    eventA|2011-08-15|  11| null|        0|
    |     xx|    eventA|2011-08-15|  12| null|        0|
    |     xx|    eventA|2011-08-15|  13| null|        0|
    |     xx|    eventA|2011-08-15|  14|    3|        3|
    |     xx|    eventA|2011-08-15|  15| null|        3|
    |     xx|    eventA|2011-08-15|  16|  100|      103|
    |     xx|    eventA|2011-08-15|  17| null|      103|
    |     xx|    eventA|2011-08-15|  18| null|      103|
    |     xx|    eventA|2011-08-15|  19| null|      103|
    |     xx|    eventA|2011-08-15|  20| null|      103|
    |     xx|    eventA|2011-08-15|  21| null|      103|
    |     xx|    eventA|2011-08-15|  22| null|      103|
    |     xx|    eventA|2011-08-15|  23| null|      103|
    |     yy|    eventA|2011-08-15|  00| null|        0|
    |     yy|    eventA|2011-08-15|  01| null|        0|
    |     yy|    eventA|2011-08-15|  02| null|        0|
    |     yy|    eventA|2011-08-15|  03| null|        0|
    |     yy|    eventA|2011-08-15|  04| null|        0|
    |     yy|    eventA|2011-08-15|  05| null|        0|
    |     yy|    eventA|2011-08-15|  06| null|        0|
    |     yy|    eventA|2011-08-15|  07| null|        0|
    |     yy|    eventA|2011-08-15|  08| null|        0|
    |     yy|    eventA|2011-08-15|  09| null|        0|
    |     yy|    eventA|2011-08-15|  10| null|        0|
    |     yy|    eventA|2011-08-15|  11|    2|        2|
    |     yy|    eventA|2011-08-15|  12|    1|        3|
    |     yy|    eventA|2011-08-15|  13|    3|        6|
    |     yy|    eventA|2011-08-15|  14| null|        6|
    |     yy|    eventA|2011-08-15|  15| null|        6|
    |     yy|    eventA|2011-08-15|  16| null|        6|
    |     yy|    eventA|2011-08-15|  17| null|        6|
    |     yy|    eventA|2011-08-15|  18| null|        6|
    |     yy|    eventA|2011-08-15|  19| null|        6|
    |     yy|    eventA|2011-08-15|  20| null|        6|
    |     yy|    eventA|2011-08-15|  21| null|        6|
    |     yy|    eventA|2011-08-15|  22| null|        6|
    |     yy|    eventA|2011-08-15|  23| null|        6|
    +-------+----------+----------+----+-----+---------+