Search code examples
scalaapache-sparkrdd

the operation about rdd and reducebykey


I create a rdd like this:

val rdd = sc.parallelize(Seq(
  ("A:0.740:8, B:0.242:5, C:0.145:2"),
  ("A:0.232:8, C:0.14:7, D:0.164:5, E:0.34:2, F:0.37:1"),
  ("B:0.245:13, H:0.123:4, I:0.673:2")
))

if I want to reduce by the key and sum the last two elements. Then get the results like:

"A:##:##"
"B:##:##"
"C:##:##"
...

What should I code?


Solution

  • Here is what you need, First, flatten all the data split with ":" and get the first index as a key and use reduceByKey to calculate sum

      rdd.flatMap(_.split(",")).map(x => {
        val s = x.split(":")
        (s(0).trim, (s(1).toDouble, s(2).toInt))
      })
        .reduceByKey((x, y) => ((x._1 + y._1), (x._2 + y._2)))
        .map(r => (r._1 + ":" + r._2._1 + ":" + r._2._2))
    
     .toDF().show()
    

    Or convert to Dataframe and concat it later, which will make it clear to understand and easy

    rdd.flatMap(_.split(",")).map(x => {
        val s = x.split(":")
        (s(0).trim, s(1).toDouble, s(2).toInt)
      }).toDF("a", "b", "c")
        .groupBy("a").agg(sum("b").as("sumB"), sum("c").as("sumC"))
        .select(concat_ws(":", Seq($"a", $"sumB", $"sumC"): _*).as("value"))
        .toDF().show()
    

    Output:

    +-----------------------+
    |value                  |
    +-----------------------+
    |H:0.246:8              |
    |A:0.972:16             |
    |I:0.673:2              |
    |B:0.487:18             |
    |C:0.28500000000000003:9|
    |D:0.164:5              |
    |E:0.34:2               |
    |F:0.37:1               |
    +-----------------------+
    

    Hope this helped!