So I am new to Scala and just starting to work with RDDs and functional Scala operations.
I am trying to iterate over the values of my Pair RDDs and return Var1
with the average of the values stored in Var2
by applying the defined average
function so that the final return is a unique list of Var1 with a single AvgVar2
associated with each one. I am having a lot of trouble figuring out how to iterate over the values.
*edit: I have the following type declarations:
case class ID: Int, Var1: Int, Var2: Int extends Serializable
I have the following function:
def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {
def average(as: Array[Var2]): AvgVar2 = {
var sum = 0.0
var i = 0.0
while (i < as.length) {
sum += Var2.val
i += 1
}
sum/i
}
//My attempt at Scala
rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}
My attempt at Scala is trying to do the following:
Var1-Var2
.Var1
and create an array of associated Var2
.average
function to each array of Var2
AvgVar2
with the associated Var1
as a collection of RDDs*Edit:
Some sample input data for rdds
:
//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]
Some sample output data:
//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]
*Edit: Line of working scala code:
rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))
Considering ID
= Var1
, a simple .map()
will solve it:
def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
def average(as: Iterable[(Int, Int)]): Double = {
as.map(_._2).reduce(_+_)/as.size.toDouble
}
rdds.map(x => (x._1, average(x._2)))
}
Output:
val input = sc.parallelize(List((1,Iterable((1,3),(1,12),(1,6))), (2, Iterable((2,5),(2,7)))))
scala> foo(input).collect
res0: Array[(Int, Double)] = Array((1,7.0), (2,6.0))
EDITED: (average()
with same signature):
def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {
def average(as: Array[Int]): Double = {
as.reduce(_+_)/as.size.toDouble
}
rdds.map(x => (x._1, average(x._2.map(tuple => tuple._2).toArray)))
}