Search code examples
apache-sparkpysparkapache-spark-sqldatabricksazure-databricks

What is the best way to build event counts for certain time resolution over multiple names in Spark dataframe while groupby?


Let's say I have the following Spark frame:

+-------------------+--------+
|timestamp          |UserName|
+-------------------+--------+
|2021-08-11 04:05:06|A       |
|2021-08-11 04:15:06|B       |
|2021-08-11 09:15:26|A       |
|2021-08-11 11:04:06|B       |
|2021-08-11 14:55:16|A       |
|2021-08-13 04:12:11|B       |
+-------------------+--------+

I want to build time-series data for desired time resolution based on events counts for each user.

  • Note1: obliviously after groupbying on UserName & counting based on desired time frame\resolution, time frames need to be kept with spark frame. (maybe use of Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming )
  • Note2: needs to fill the missing gap for a specific time frame and replace 0 if there are no events.
  • Note3: I'm not interested in using UDF or hacking it via toPandas().

So let's say for 24hrs (daily) time frame expected results should be like below after groupBy:

+------------------------------------------+-------------+-------------+
|window_frame_24_Hours                     | username A  | username B  |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 23:59:59}|3            |2            |
|{2021-08-12 00:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 23:59:59}|0            |1            |
+------------------------------------------+-------------+-------------+

Edit1: in case of 12hrs time frame\resolution:

+------------------------------------------+-------------+-------------+
|window_frame_12_Hours                     | username A  | username B  |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 11:59:59}|2            |2            |
|{2021-08-11 12:00:00, 2021-08-11 23:59:59}|1            |0            |
|{2021-08-12 00:00:00, 2021-08-12 11:59:59}|0            |0            |
|{2021-08-12 12:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 11:59:59}|0            |1            |
|{2021-08-13 12:00:00, 2021-08-13 23:59:59}|0            |0            |
+------------------------------------------+-------------+-------------+

Solution

  • Group by time window '1 day' + UserName to count then group by window frame and pivot user names:

    from pyspark.sql import functions as F
    
    result = df.groupBy(
        F.window("timestamp", "1 day").alias("window_frame_24_Hours"),
        "UserName"
    ).count().groupBy("window_frame_24_Hours").pivot("UserName").agg(
       F.first("count")
    ).na.fill(0)
    
    result.show(truncate=False)
    
    #+------------------------------------------+---+---+
    #|window_frame_24_Hours                     |A  |B  |
    #+------------------------------------------+---+---+
    #|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0  |1  |
    #|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3  |2  |
    #+------------------------------------------+---+---+
    

    If you need the missing dates, you'll have to generate all dates using sequence on min and max timestamp then join with original dataframe:

    intervals_df = df.withColumn(
        "timestamp",
        F.date_trunc("day", "timestamp")
    ).selectExpr(
        "sequence(min(timestamp), max(timestamp + interval 1 day), interval 1 day) as dates"
    ).select(
        F.explode(
            F.expr("transform(dates, (x, i) -> IF(i!=0, struct(date_trunc('dd', dates[i-1]) as start, dates[i] as end), null))")
        ).alias("frame")
    ).filter("frame is not null").crossJoin(
        df.select("UserName").distinct()
    )
    
    result = intervals_df.alias("a").join(
        df.alias("b"),
        F.col("timestamp").between(F.col("frame.start"), F.col("frame.end"))
        & (F.col("a.UserName") == F.col("b.UserName")),
        "left"
    ).groupBy(
        F.col("frame").alias("window_frame_24_Hours")
    ).pivot("a.UserName").agg(
        F.count("b.UserName")
    )
    
    result.show(truncate=False)
    
    #+------------------------------------------+----------+----------+
    #|window_frame_24_Hours                     |username_A|username_B|
    #+------------------------------------------+----------+----------+
    #|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0         |1         |
    #|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3         |2         |
    #|{2021-08-12 00:00:00, 2021-08-13 00:00:00}|0         |0         |
    #+------------------------------------------+----------+----------+