Let's say I have the following Spark frame:
+-------------------+--------+
|timestamp |UserName|
+-------------------+--------+
|2021-08-11 04:05:06|A |
|2021-08-11 04:15:06|B |
|2021-08-11 09:15:26|A |
|2021-08-11 11:04:06|B |
|2021-08-11 14:55:16|A |
|2021-08-13 04:12:11|B |
+-------------------+--------+
I want to build time-series data for desired time resolution based on events counts for each user.
UserName
& counting based on desired time frame\resolution, time frames need to be kept with spark frame. (maybe use of Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming )UDF
or hacking it via toPandas()
.So let's say for 24hrs (daily) time frame expected results should be like below after groupBy:
+------------------------------------------+-------------+-------------+
|window_frame_24_Hours | username A | username B |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 23:59:59}|3 |2 |
|{2021-08-12 00:00:00, 2021-08-12 23:59:59}|0 |0 |
|{2021-08-13 00:00:00, 2021-08-13 23:59:59}|0 |1 |
+------------------------------------------+-------------+-------------+
Edit1: in case of 12hrs time frame\resolution:
+------------------------------------------+-------------+-------------+
|window_frame_12_Hours | username A | username B |
+------------------------------------------+-------------+-------------+
|{2021-08-11 00:00:00, 2021-08-11 11:59:59}|2 |2 |
|{2021-08-11 12:00:00, 2021-08-11 23:59:59}|1 |0 |
|{2021-08-12 00:00:00, 2021-08-12 11:59:59}|0 |0 |
|{2021-08-12 12:00:00, 2021-08-12 23:59:59}|0 |0 |
|{2021-08-13 00:00:00, 2021-08-13 11:59:59}|0 |1 |
|{2021-08-13 12:00:00, 2021-08-13 23:59:59}|0 |0 |
+------------------------------------------+-------------+-------------+
Group by time window '1 day'
+ UserName
to count then group by window frame and pivot user names:
from pyspark.sql import functions as F
result = df.groupBy(
F.window("timestamp", "1 day").alias("window_frame_24_Hours"),
"UserName"
).count().groupBy("window_frame_24_Hours").pivot("UserName").agg(
F.first("count")
).na.fill(0)
result.show(truncate=False)
#+------------------------------------------+---+---+
#|window_frame_24_Hours |A |B |
#+------------------------------------------+---+---+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0 |1 |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3 |2 |
#+------------------------------------------+---+---+
If you need the missing dates, you'll have to generate all dates using sequence
on min and max timestamp
then join with original dataframe:
intervals_df = df.withColumn(
"timestamp",
F.date_trunc("day", "timestamp")
).selectExpr(
"sequence(min(timestamp), max(timestamp + interval 1 day), interval 1 day) as dates"
).select(
F.explode(
F.expr("transform(dates, (x, i) -> IF(i!=0, struct(date_trunc('dd', dates[i-1]) as start, dates[i] as end), null))")
).alias("frame")
).filter("frame is not null").crossJoin(
df.select("UserName").distinct()
)
result = intervals_df.alias("a").join(
df.alias("b"),
F.col("timestamp").between(F.col("frame.start"), F.col("frame.end"))
& (F.col("a.UserName") == F.col("b.UserName")),
"left"
).groupBy(
F.col("frame").alias("window_frame_24_Hours")
).pivot("a.UserName").agg(
F.count("b.UserName")
)
result.show(truncate=False)
#+------------------------------------------+----------+----------+
#|window_frame_24_Hours |username_A|username_B|
#+------------------------------------------+----------+----------+
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0 |1 |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3 |2 |
#|{2021-08-12 00:00:00, 2021-08-13 00:00:00}|0 |0 |
#+------------------------------------------+----------+----------+