Search code examples
stringaggregation

How to aggregate string to dictionary like results in pyspark?


I have a dataframe and I want to aggregate to daily.

data = [
    (125, '2012-10-10','good'),
    (20, '2012-10-10','good'),
    (40, '2012-10-10','bad'),
    (60, '2012-10-10','NA')]
df = spark.createDataFrame(data, ["temperature", "date","performance"])

I could aggregate numerical values using spark built in functions like max, min, avg. How could I aggregate strings?

I expect something like:

date max_temp min_temp performance_frequency
2012-10-10 125 20 "good": 2, "bad":1, "NA":1

Solution

  • We can use MapType and UDF with Counter to return the value counts,

    from pyspark.sql import functions as F
    from pyspark.sql.types import MapType,StringType,IntegerType
    from collections import Counter
    
    data = [(125, '2012-10-10','good'),(20, '2012-10-10','good'),(40, '2012-10-10','bad'),(60, '2012-10-10','NA')]
    df = spark.createDataFrame(data, ["temperature", "date","performance"])
    
    udf1 = F.udf(lambda x: dict(Counter(x)),MapType(StringType(),IntegerType()))
    
    df.groupby('date').agg(F.min('temperature'),F.max('temperature'),udf1(F.collect_list('performance')).alias('performance_frequency')).show(1,False)
    +----------+----------------+----------------+---------------------------------+
    |date      |min(temperature)|max(temperature)|performance_frequency            |
    +----------+----------------+----------------+---------------------------------+
    |2012-10-10|20              |125             |Map(NA -> 1, bad -> 1, good -> 2)|
    +----------+----------------+----------------+---------------------------------+
    
    df.groupby('date').agg(F.min('temperature'),F.max('temperature'),udf1(F.collect_list('performance')).alias('performance_frequency')).collect()
    [Row(date='2012-10-10', min(temperature)=20, max(temperature)=125, performance_frequency={'bad': 1, 'good': 2, 'NA': 1})]
    

    Hope this helps!