I've encountered this problem, as I'm learning Apache Spark framework. Consider the following simple RDD
scala> val rdd1 = sc.parallelize(List((1, Set("C3", "C2")),
(2, Set("C1", "C5", "C3")),
(3, Set("C2", "C7"))))
rdd1: RDD[(Int, Set[String])]
I want to intersect each Set in every element in rdd1
with the sets of every other element in the "same" rdd1
; so that the results would be of the form:
newRDD: RDD[(Int, Int, Set[String])]
// and newRDD.collect will look like:
newRDD: Array[(Int, Int, Set[String])] = Array((1, 1, Set("C3", "C2")), (1, 2, Set("C3")), (1, 3, Set("C2")),
(2, 1, Set("C3")), (2, 2, Set("C1", "C5", "C3")), (2, 3, Set()),
(3, 1, Set("C2")), (3, 2, Set()), (1, 3, Set("C2", "C7")))
I tried nesting rdd1
like so
scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})
however, this will throw 'Task not serilizable' exception.
Now if I wanted to avoid rdd1.collect()
or any other action operations before performing
scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})
would it be possible to achive the desired newRDD
?
The reason why you are getting 'Task not serilizable' exception is because you are trying to put an RDD
in a map for an other RDD
, in this case Spark would try to serialise the second RDD
. Normally this kind of problem you'd solve with joins:
val newRDD = rdd1.cartesian(rdd1).map { case ((a, aSet), (b, bSet)) =>
(a, b, aSet.intersect(bSet))
}
Here the cartesian join creates a pair of each sets in a new RDD
, which you can intersect.