Search code examples
scalasummapreduceapache-flink

Apache Flink - Sum and keep grouped


Suppose I have records like this:

("a-b", "data1", 1)
("a-c", "data2", 1)
("a-b", "data3", 1)

How can I group and sum in Apache Flink, such that I have the following results?

("a-b", ["data1", "data3"], 2)
("a-c", ["data2"], 1)

Regards, Kevin


Solution

  • I've achieved this in the Flink shell ($FLINK_HOME/bin/start-scala-shell.sh local) with the following code:

    import org.apache.flink.util.Collector
    benv.
      fromElements(("a-b", "data1", 1), ("a-c", "data2", 1), ("a-b", "data3", 1)).
      groupBy(0).
      reduceGroup { 
        (it: Iterator[(String, String, Int)], out: Collector[(String, List[String], Int)]) => {
          // Watch out: if the group is _very_ large this can lead to OOM errors
          val group = it.toList
          // For all groups with at least one element (prevent out-of-bounds)
          if (group.length > 0)
            // Get the "name", all the elements and their third-column aggregate
            out.collect((group(0)._1, group.map(_._2), group.map(_._3).sum))
        }
      }.print
    

    With the following output

    (a-b,List(data1, data3),2)
    (a-c,List(data2),1)