Search code examples
scalaapache-sparkrdd

Aggregating sum for RDD in Scala (Spark)


If I have a variable such as books: RDD[(String, Integer, Integer)], how do I want to merge keys with the same String (could represent title), and then sum the corresponding two integers (could represent pages and price).

ex:

[("book1", 20, 10),
 ("book2", 5, 10),
 ("book1", 100, 100)]

becomes

[("book1", 120, 110),
 ("book2", 5, 10)]

Solution

  • With an RDD you can use reduceByKey.

    case class Book(name: String, i: Int, j: Int) {
      def +(b: Book) = if(name == b.name) Book(name, i + b.i, j + b.j) else throw Exception
    }
    
    val rdd = sc.parallelize(Seq(
       Book("book1", 20, 10), 
       Book("book2",5,10), 
       Book("book1",100,100)))
    
    val aggRdd = rdd.map(book => (book.name, book))
       .reduceByKey(_+_) // reduce calling our defined `+` function
       .map(_._2)        // we don't need the tuple anymore, just get the Books
    
    aggRdd.foreach(println)
    // Book(book1,120,110)
    // Book(book2,5,10)