Search code examples
pythonpysparkgroup-byaggregate-functionsdistinct

calculate the sum and countDistinct after groupby in PySpark


I have a PySpark dataframe and would like to groupby several columns and then calculate the sum of some columns and count distinct values of another column. As countDistinct is not a build in aggregation function, I can't use simple expressions like the ones I tried here:

sum_cols = ['a', 'b']
count_cols = ['id']
exprs1 = {x: "sum" for x in sum_cols}
exprs2 = {x: "countDistinct" for x in count_cols}
exprs = {**exprs1, **exprs2}

df_aggregated = df.groupby('month','product').agg(exprs)

I also tried the approach from this answer as exprs2 = [countDistinct(x) for x in count_cols] but I received an error message when I AssertionError: all exprs should be Column, when I tried it only for the aggregation column.

How could I combine sum and count distinct in one aggregation? I know, that I could do it once with the sum columns and once with the countDistinct columns and than join both dataframes but there should be a solution to do that in one step...


Solution

  • Instead of using the dict-version of agg use the version that takes a list of columns:

    from pyspark.sql import functions as F
    df = ...
    exprs1 = [F.sum(c) for c in sum_cols]
    exprs2 = [F.countDistinct(c) for c in count_cols]
    
    df_aggregated = df.groupby('month_product').agg(*(exprs1+exprs2))
    

    If you want keep the current logic you could switch to approx_count_distinct. Unlike countDistinct this function is available as SQL function.