I have a table data
containing three columns: id
, time
, and text
. Rows with the same id
comprise the same long text ordered by time
. The goal is to group by id
, order by time
, and then aggregate them (concatenate all the text
). I am using PySpark.
I can get the order of elements within groups using a window function:
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
w = W.partitionBy(F.col('id')).orderBy(F.col('time').asc())
(
data
.withColumn('text_num', F.row_number().over(w))
)
I can also concatenate the texts by group, but this does not guarantee the order:
(
data
.groupby('id')
.agg(F.concat_ws(' ', F.collect_list(F.col('text'))).alias('concat_text'))
)
How do I groupby, sort within groups, and then aggregate the sorted data?
Suppose we have a dataframe:
data = spark.createDataFrame(
[('a', 1, 'str1'), ('a', 2, 'str2')],
schema=['id', 'time', 'text']
)
data.printSchema()
data.show(1, False)
+---+----+----+
|id |time|text|
+---+----+----+
|a |1 |str1|
+---+----+----+
You can use the collect_list
to collect the element of each group in sequence and select the max group:
data.withColumn(
'text_lst', func.collect_list('text').over(Window.partitionBy('id').orderBy(func.asc('time')))
).groupBy(
'id'
).agg(
func.concat_ws('-', func.max('text_lst')).alias('text_concat')
).show(
10, False
)
+---+-----------+
|id |text_concat|
+---+-----------+
|a |str1-str2 |
+---+-----------+
Edit 1: another solution without using the max
If you don't want to collect all the text list, you can use the struct
to construct a new column with time
and text
column:
data.withColumn(
'struct', func.struct('time', 'text')
).groupBy(
'id'
).agg(
func.array_sort(func.collect_list(func.col('struct'))).alias('sort_array')
).withColumn(
'text_concat', func.concat_ws('-', func.col('sort_array').getItem('text'))
).show(
10, False
)
+---+----------------------+-----------+
|id |sort_array |text_concat|
+---+----------------------+-----------+
|a |[{1, str1}, {2, str2}]|str1-str2 |
+---+----------------------+-----------+
As you collect the list of struct and perform the array sorting, it will sort the column based on the first field of the struct. After sorting, you can extract the field in struct to perform concatenation.