Search code examples
scalaapache-sparkmapreducerddreduce

Spack [Scala]: Reduce a nested tuple value by key


Let's say I have a Spark Scala program with a RDD named mention_rdd that goes as follows:

(name, (filename, sum))
...
(Maria, (file0, 3))
(John, (file0, 1))
(Maria, (file1, 6))
(Maria, (file2, 1))
(John, (file2, 3))
...

Where we have the filenames and the number of occurrences for each name.

I want to reduce and find, for each name, the filename with the max occurrences. For example:

(name, (filename, max(sum))
...
(Maria, (file1, 6))
(John, (file2, 3))
...

I tried to access the (filename,sum) tuple of the RDD on its own, so I can reduce from there by name (due to an error that said i can't traverse from the mention_rdd because (String,Int) is not a TraversableOnce type):

val output = mention_rdd.flatMap(file_counts => file_counts._2.map(file_counts._2._1, file_counts._2._2))   
        .reduceByKey((a, b) => if (a > b) a else b)

But I got an error saying value map is not a member of (String, Int)

Is this possible to do within Spark? And if so, how? Was my approach flawed from the start?


Solution

  • Why not just:

    val output = mention_rdd.reduceByKey {
      case ((file1, sum1), (file2, sum2)) =>
        if (sum2 >= sum1) (file2, sum2)
        else (file1, sum1)
    }