I get a warning when using an RDD in a for comprension, and I'm not sure if it's something I'm doing wrong. If I do this:
val sc = new SparkContext(...)
val anRDD = sc.parallelize(List(
("a", List(1, 2, 3)),
("b", List(4),
("c", List(5, 6))
)
for {
(someString, listOfInts) <- anRDD
someInt <- listOfInts
} yield (someString, someInt)
Then I get this output:
warning: `withFilter' method does not yet exist on org.apache.spark.rdd.RDD[(String, List[Int])], using `filter' method instead
(s, li) <- rl
But it does still successfully return a FlatMappedRDD[(String, Int)]. Am I doing something wrong? Or is it safe to ignore this warning?
Update: I would also accept as an answer how the for-comprehension converts these operations to map/flatMap/filter calls, since I didn't think there'd be any filter or withFilter calls required. I assumed it would be equivalent to something similar to this:
anRDD.flatMap(tuple => tuple.map(someInt => (tuple._1, someInt)))
But this doesn't include any filter or withFilter calls, which seems to be the source of the warning.
Oh, I'm using Spark 1.2.0, Scala 2.10.4, and this is all within the REPL.
First, I am no expert, but have done some digging and here is what I have found:
I compiled the code using -print
(since JavaDecompiler was failing for some reason), which will print out the program with all Scala-specific features removed. There, I saw:
test.this.anRDD().filter({
(new anonymous class anonfun$1(): Function1)
}).flatMap({
(new anonymous class anonfun$2(): Function1)
}, ClassTag.apply(classOf[scala.Tuple2]));
You will notice the filter
...so, I checked on the anonfun$1
:
public final boolean apply(Tuple2<String, List<Object>> check$ifrefutable$1)
{
Tuple2 localTuple2 = check$ifrefutable$1;
boolean bool;
if (localTuple2 != null) {
bool = true;
} else {
bool = false;
}
return bool;
}
So, if you put all of this together, it seems that the filter
is happening in the comprehension because it is filtering out anything that is NOT a Tuple2
.
And, the preference is to use withFilter
instead of filter
(not sure why atm). You can see that by decompiling a regular list instead of an RDD
object test {
val regList = List(
("a", List(1, 2, 3)),
("b", List(4)),
("c", List(5, 6))
)
val foo = for {
(someString, listOfInts) <- regList
someInt <- listOfInts
} yield (someString, someInt)
}
Which decompiles to:
test.this.regList().withFilter({
(new anonymous class anonfun$1(): Function1)
}).flatMap({
(new anonymous class anonfun$2(): Function1)
}, immutable.this.List.canBuildFrom()).$asInstanceOf[List]();
So, it is the same thing, except it uses withFilter
where it can