new to pyspark. i have this example dataframe:
df = spark.createDataFrame(
(("7dc88", "D21", 14.14, 2, 10, [["msgA", 15, "a"],["msgB", 9, "g"],["msgC", 6, "z"],["msgD", 4, "m"],["msgE", 1, "e"]]),
("1c36a3", "D21", 32.14, 18, 45, [["msgA", 45, "n"],["msgB", 30, "q"],["msgC", 24, "h"],["msgD", 19, "y"],["msgE", 11, "c"]])),
"uniqueId : string, tag : string, score : float, time0 : int, time1 : int, msgs : array<struct<msg : string, time : int, sysid : string>>")
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
|uniqueId|tag|score|time0|time1|msgs |
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
|7dc885 |D21|14.14|2 |10 |[{msgA, 15, a}, {msgB, 9, g}, {msgC, 6, z}, {msgD, 4, m}, {msgE, 1, e}] |
|1c36a3 |D21|32.14|18 |45 |[{msgA, 45, n}, {msgB, 30, q}, {msgC, 24, h}, {msgD, 19, y}, {msgE, 11, c}]|
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
the msgs
column is an array of struct (msg
, time
, sysid
). (in my real use-case, the message structure has more elements and some are nested structures. also each uniqueId could have a couple hundred messages. and there are millions of uniqueIds.)
the time0
and time1
column specify an inclusive time range for a desired subset of the msgs
. i need to create a new column msgs_subset
that is ordered by time:
+--------+---+-----+-----+-----+------------------------------------------------------------+
|uniqueId|tag|score|time0|time1|msgs_subset |
+--------+---+-----+-----+-----+------------------------------------------------------------+
|7dc885 |D21|14.14|2 |10 |[{msgD, 4, m}, {msgC, 6, z}, {msgB, 9, g}] |
|1c36a3 |D21|32.14|18 |45 |[{msgD, 19, y}, {msgC, 24, h}, {msgB, 30, q}, {msgA, 45, n}]|
+--------+---+-----+-----+-----+------------------------------------------------------------+
i'm able to create the msgs_subset
column using this code:
df_msg_subset = (
df
.withColumn("msg_explode", F.explode(F.col("msgs")))
.withColumn("msg_time", F.col("msg_explode.time"))
.filter((F.col("msg_time") >= F.col('time0')) & (F.col("msg_time") <= F.col('time1')))
.sort(F.col("uniqueId"), F.col("msg_time"))
.groupBy(list(set(df.columns) - {'msgs'})).agg(F.collect_list('msg_explode').alias('msgs_subset'))
)
QUESTION: can this be done without exploding? i'm assuming that exploding (creating additional rows) only to then groupby/collapse them is expensive. how can i do the same using pyspark (not UDF) functions operating on msgs
? it feels like it would be something like this, but i'm stuck on the create_subset_column
function.
def create_subset_column(msgs, time0, time1):
return F.sort_array(F.filter(...))
df_msg_subset = (
df
.withColumn("msgs_subset", create_subset_column("msgs", "time0", "time1"))
.drop("msgs")
)
i'd appreciate any help/pointers.
AN EXAMPLE DF WHERE TIMES ARE EPOCH_MILLISEC
df = spark.createDataFrame(
(
("7dc88", "D21", 14.14, 1642970972787, 1642970985027, [
["msgA", 1642970990067, "a"],
["msgB", 1642970985027, "g"],
["msgC", 1642970978077, "z"],
["msgD", 1642970972787, "m"],
["msgE", 1642970960897, "e"],
]),
("1c36a3", "D21", 32.14, 1642971056787, 1642971074107, [
["msgA", 1642971080687, "n"],
["msgB", 1642971074107, "q"],
["msgC", 1642971068777, "h"],
["msgD", 1642971062157, "y"],
["msgE", 1642971056787, "c"],
])
),
"uniqueId:string, tag:string, score:float, time0:long, time1:long, msgs:array<struct<msg:string, time:long, sysid:string>>"
)
Does this help?
df = spark.createDataFrame(
(("7dc88", "D21", 14.14, 2, 10, [["msgA", 15, "a"],["msgB", 9, "g"],["msgC", 6, "z"],["msgD", 4, "m"],["msgE", 1, "e"]]),
("1c36a3", "D21", 32.14, 18, 45, [["msgA", 45, "n"],["msgB", 30, "q"],["msgC", 24, "h"],["msgD", 19, "y"],["msgE", 11, "c"]])),
"uniqueId : string, tag : string, score : float, time0 : int, time1 : int, msgs : array<struct<msg : string, time : int, sysid : string>>")
Original:
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
|uniqueId|tag|score|time0|time1|msgs |
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
|7dc88 |D21|14.14|2 |10 |[{msgA, 15, a}, {msgB, 9, g}, {msgC, 6, z}, {msgD, 4, m}, {msgE, 1, e}] |
|1c36a3 |D21|32.14|18 |45 |[{msgA, 45, n}, {msgB, 30, q}, {msgC, 24, h}, {msgD, 19, y}, {msgE, 11, c}]|
+--------+---+-----+-----+-----+---------------------------------------------------------------------------+
df = (df.withColumn('msgs', expr("filter(msgs, msg -> ((msg.time >= time0) and (msg.time <= time1)))"))
.withColumn("msgs", array_sort(expr("transform(msgs, x -> struct(x.time, x as original))")))
.withColumn("msgs", expr("transform(msgs, x -> x.original)")))
df.show(truncate=False)
Transformed:
--------+---+-----+-----+-----+------------------------------------------------------------+
|uniqueId|tag|score|time0|time1|msgs |
+--------+---+-----+-----+-----+------------------------------------------------------------+
|7dc88 |D21|14.14|2 |10 |[{msgD, 4, m}, {msgC, 6, z}, {msgB, 9, g}] |
|1c36a3 |D21|32.14|18 |45 |[{msgD, 19, y}, {msgC, 24, h}, {msgB, 30, q}, {msgA, 45, n}]|
+--------+---+-----+-----+-----+------------------------------------------------------------+