Search code examples
scalardditerable

Best way to extract from an RDD Iterable in Scala


If I have the following function and I want to split the Iterable to an RDD of Var1 with an array of Var2, what is the best way to do it?

Here is my function:

def foo(rdds: RDD[(ID, Iterable[(Var1,Var2)])]) : RDD[(Var1,Array[Var2])] = {
   rdds.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], (x._2.map(it => it._2).toArray)))
}

And here is my sample input data:

//RDD[(ID, Iterable[(Var1,Var2)...])]
RDD[("ID1",Iterable[(1,4),(1,8),(1,15)])],
RDD[("ID2",Iterable[(2,18),(2,29)])]

And I want the output to look like this:

//RDD[(Var1,Array[Var2])]
RDD[1,(4,8,15)],
RDD[2,(18,29)]

And while my above code works. It seems that it is unnecessary to traverse x._2 twice to get both parts of the Iterable and I don't like how I have to an explicitly perform an asInstanceOf[Var1] conversion to change the Iterable type.

Is there a better way to extract the elements from the Iterable and place them in the new RDD grouped by Var1?


Solution

  • You can just change the way you are obtaining var1. According to the data you have shown, the var1 values in the Iterable is same. So no need to iterate through the Iterable twice. You can do something like :

    def foo(rdds: RDD[(ID, Iterable[(Var1,Var2)])]) : RDD[(Var1,Array[Var2])] = {
       rdds.map(x => (x._2.head._1, (x._2.map(it => it._2).toArray)))
    }
    

    Which will give the required result.

    NOTE - This works only when x._2 has atleast one element present. If there is a possibility that x._2 is empty, then x._2.headOption.getOrElse((defaultVar1, defaultVar2))._1 might be a good idea.