Search code examples
group-bypysparkaggregate

pyspark: aggregate on the most frequent value in a column


  aggregrated_table = df_input.groupBy('city', 'income_bracket') \
        .agg(
       count('suburb').alias('suburb'),
       sum('population').alias('population'),
       sum('gross_income').alias('gross_income'),
       sum('no_households').alias('no_households'))

Would like to group by city and income bracket but within each city certain suburbs have different income brackets. How do I group by the most frequently occurring income bracket per city?

for example:

city1 suburb1 income_bracket_10 
city1 suburb1 income_bracket_10 
city1 suburb2 income_bracket_10 
city1 suburb3 income_bracket_11 
city1 suburb4 income_bracket_10 

Would be grouped by income_bracket_10


Solution

  • Using a window function before aggregating might do the trick:

    from pyspark.sql import Window
    import pyspark.sql.functions as psf
    
    w = Window.partitionBy('city')
    aggregrated_table = df_input.withColumn(
        "count", 
        psf.count("*").over(w)
    ).withColumn(
        "rn", 
        psf.row_number().over(w.orderBy(psf.desc("count")))
    ).filter("rn = 1").groupBy('city', 'income_bracket').agg(
       psf.count('suburb').alias('suburb'),
       psf.sum('population').alias('population'),
       psf.sum('gross_income').alias('gross_income'),
       psf.sum('no_households').alias('no_households'))
    

    you can also use a window function after aggregating since you're keeping a count of (city, income_bracket) occurrences.