Search code examples
scalaapache-sparkdataframeapache-spark-sqlorc

Aggregating multiple columns with custom function in Spark


I was wondering if there is some way to specify a custom aggregation function for spark dataframes over multiple columns.

I have a table like this of the type (name, item, price):

john | tomato | 1.99
john | carrot | 0.45
bill | apple  | 0.99
john | banana | 1.29
bill | taco   | 2.59

to:

I would like to aggregate the item and it's cost for each person into a list like this:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)

Is this possible in dataframes? I recently learned about collect_list but it appears to only work for one column.


Solution

  • The easiest way to do this as a DataFrame is to first collect two lists, and then use a UDF to zip the two lists together. Something like:

    import org.apache.spark.sql.functions.{collect_list, udf}
    import sqlContext.implicits._
    
    val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))
    
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")
    
    val df2 = df.groupBy("name").agg(
      collect_list(col("food")) as "food",
      collect_list(col("price")) as "price" 
    ).withColumn("food", zipper(col("food"), col("price"))).drop("price")
    
    df2.show(false)
    # +----+---------------------------------------------+
    # |name|food                                         |
    # +----+---------------------------------------------+
    # |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
    # |bill|[[apple,0.99], [taco,2.59]]                  |
    # +----+---------------------------------------------+