Search code examples
pysparkgroup-byaggregate-functionswindow-functions

Groupby, sort within groups, and aggregate the sorted data


I have a table data containing three columns: id, time, and text. Rows with the same id comprise the same long text ordered by time. The goal is to group by id, order by time, and then aggregate them (concatenate all the text). I am using PySpark.

I can get the order of elements within groups using a window function:

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

w = W.partitionBy(F.col('id')).orderBy(F.col('time').asc())
(
    data
    .withColumn('text_num', F.row_number().over(w))
)

I can also concatenate the texts by group, but this does not guarantee the order:

(
    data
    .groupby('id')
    .agg(F.concat_ws(' ', F.collect_list(F.col('text'))).alias('concat_text'))
)

How do I groupby, sort within groups, and then aggregate the sorted data?


Solution

  • Suppose we have a dataframe:

    data = spark.createDataFrame(
        [('a', 1, 'str1'), ('a', 2, 'str2')],
        schema=['id', 'time', 'text']
    )
    
    data.printSchema()
    data.show(1, False)
    +---+----+----+
    |id |time|text|
    +---+----+----+
    |a  |1   |str1|
    +---+----+----+
    

    You can use the collect_list to collect the element of each group in sequence and select the max group:

    data.withColumn(
        'text_lst', func.collect_list('text').over(Window.partitionBy('id').orderBy(func.asc('time')))
    ).groupBy(
        'id'
    ).agg(
        func.concat_ws('-', func.max('text_lst')).alias('text_concat')
    ).show(
        10, False
    )
    +---+-----------+
    |id |text_concat|
    +---+-----------+
    |a  |str1-str2  |
    +---+-----------+
    

    Edit 1: another solution without using the max If you don't want to collect all the text list, you can use the struct to construct a new column with time and text column:

    data.withColumn(
        'struct', func.struct('time', 'text')
    ).groupBy(
        'id'
    ).agg(
        func.array_sort(func.collect_list(func.col('struct'))).alias('sort_array')
    ).withColumn(
        'text_concat', func.concat_ws('-', func.col('sort_array').getItem('text'))
    ).show(
        10, False
    )
    +---+----------------------+-----------+
    |id |sort_array            |text_concat|
    +---+----------------------+-----------+
    |a  |[{1, str1}, {2, str2}]|str1-str2  |
    +---+----------------------+-----------+
    

    As you collect the list of struct and perform the array sorting, it will sort the column based on the first field of the struct. After sorting, you can extract the field in struct to perform concatenation.