Search code examples
scalaapache-sparkkey-pairjava-pair-rdd

Iterating over an RDD Iterable in Scala


So I am new to Scala and just starting to work with RDDs and functional Scala operations.

I am trying to iterate over the values of my Pair RDDs and return Var1 with the average of the values stored in Var2 by applying the defined averagefunction so that the final return is a unique list of Var1 with a single AvgVar2 associated with each one. I am having a lot of trouble figuring out how to iterate over the values.

*edit: I have the following type declarations:

case class ID: Int,  Var1: Int, Var2: Int extends Serializable

I have the following function:

  def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {

    def average(as: Array[Var2]): AvgVar2 = {
       var sum = 0.0
       var i = 0.0
       while (i < as.length) {
           sum += Var2.val
           i += 1
      }
      sum/i
    }

    //My attempt at Scala
    rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}

My attempt at Scala is trying to do the following:

  1. split the RDD pair Iterable into key-value pairs of Var1-Var2.
  2. Group by the key of Var1 and create an array of associated Var2.
  3. Apply my average function to each array of Var2
  4. Return the AvgVar2 with the associated Var1 as a collection of RDDs

*Edit:

Some sample input data for rdds:

//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]

Some sample output data:

//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]

*Edit: Line of working scala code:

rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))

Solution

  • Considering ID = Var1, a simple .map() will solve it:

    def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
    
      def average(as: Iterable[(Int, Int)]): Double = {
        as.map(_._2).reduce(_+_)/as.size.toDouble
      }
    
      rdds.map(x => (x._1, average(x._2)))
    }
    

    Output:

    val input = sc.parallelize(List((1,Iterable((1,3),(1,12),(1,6))), (2, Iterable((2,5),(2,7)))))
    
    scala> foo(input).collect
    res0: Array[(Int, Double)] = Array((1,7.0), (2,6.0))
    

    EDITED: (average() with same signature):

    def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
    
      def average(as: Array[Int]): Double = {
        as.reduce(_+_)/as.size.toDouble
      }
    
      rdds.map(x => (x._1, average(x._2.map(tuple => tuple._2).toArray)))
    }