If I have the following function and I want to split the Iterable to an RDD of Var1
with an array of Var2
, what is the best way to do it?
Here is my function:
def foo(rdds: RDD[(ID, Iterable[(Var1,Var2)])]) : RDD[(Var1,Array[Var2])] = {
rdds.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], (x._2.map(it => it._2).toArray)))
}
And here is my sample input data:
//RDD[(ID, Iterable[(Var1,Var2)...])]
RDD[("ID1",Iterable[(1,4),(1,8),(1,15)])],
RDD[("ID2",Iterable[(2,18),(2,29)])]
And I want the output to look like this:
//RDD[(Var1,Array[Var2])]
RDD[1,(4,8,15)],
RDD[2,(18,29)]
And while my above code works. It seems that it is unnecessary to traverse x._2
twice to get both parts of the Iterable and I don't like how I have to an explicitly perform an asInstanceOf[Var1]
conversion to change the Iterable type.
Is there a better way to extract the elements from the Iterable and place them in the new RDD grouped by Var1
?
You can just change the way you are obtaining var1
. According to the data you have shown, the var1
values in the Iterable
is same. So no need to iterate through the Iterable
twice. You can do something like :
def foo(rdds: RDD[(ID, Iterable[(Var1,Var2)])]) : RDD[(Var1,Array[Var2])] = {
rdds.map(x => (x._2.head._1, (x._2.map(it => it._2).toArray)))
}
Which will give the required result.
NOTE - This works only when x._2
has atleast one element present. If there is a possibility that x._2
is empty, then x._2.headOption.getOrElse((defaultVar1, defaultVar2))._1
might be a good idea.