Search code examples
scalaapache-sparkgroup-byrddapache-spark-mllib

How to groupby and aggregate multiple fields using combineByKey RDD?


I have a sample file I am trying to find out for a given field total number of another field and its count and list of values from another field using combineByKey. I am trying to understand combineByKey, same requirement I understood from this question using aggregateByKey, now I would like to understand combineByKey.

I tried the below code which is the same as aggregateByKey but I'm getting an type mismatch error. I'm not sure whether my types are correct for createCombiner or mergeValue or mergeCombiner. Please help me to get a good understanding of combineByKey.

Sample data:

44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79 

Code for combineByKey:

val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))

def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)

def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
  println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
  (acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}

def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
  println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
  (acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}

rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)

Error Message:

error: type mismatch;
found   : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
                 ^

The expecting result is:

customerid, (orderids,..,..,....), totalamount, number of orderids

Using the provided sample data it will be:

(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))

Mismatch is pointing to createCombiner. Could anyone please help to me to understand combineByKey?


Solution

  • The problem here is the createCombiner function. Look at combineByKey:

    combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
    

    Simply said, C is the format you want to end up with ((Seq[Int], Double, Int)) and V what you begin with ((Int, Double)). Here I changed Float to Double since that is what is usually used in Spark. That means that the createCombiner function should look as follows:

    def createCombiner = (tuple: (Int, Double)) => (Seq(tuple._1), tuple._2, 1)
    

    Both mergeValue and mergeCombiner looks ok, however, you will not see any print statements in Spark if you execute the code on a cluster (see: Spark losing println() on stdout).