Search code examples
scalaapache-sparkfunctional-programmingrddset-intersection

Apache Spark: RDD multiple passes with a simple operation


I've encountered this problem, as I'm learning Apache Spark framework. Consider the following simple RDD

scala> val rdd1 = sc.parallelize(List((1, Set("C3", "C2")), 
                                      (2, Set("C1", "C5", "C3")), 
                                      (3, Set("C2", "C7"))))
rdd1: RDD[(Int, Set[String])]

I want to intersect each Set in every element in rdd1 with the sets of every other element in the "same" rdd1; so that the results would be of the form:

newRDD: RDD[(Int, Int, Set[String])]
// and newRDD.collect will look like:
newRDD: Array[(Int, Int, Set[String])] = Array((1, 1, Set("C3", "C2")), (1, 2, Set("C3")), (1, 3, Set("C2")),
                                               (2, 1, Set("C3")), (2, 2, Set("C1", "C5", "C3")), (2, 3, Set()),
                                               (3, 1, Set("C2")), (3, 2, Set()), (1, 3, Set("C2", "C7")))

I tried nesting rdd1 like so

scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})

however, this will throw 'Task not serilizable' exception.

Now if I wanted to avoid rdd1.collect() or any other action operations before performing

scala> val newRDD = rdd1 map (x => {rdd1 map (y => (x._1, y._1, x._2.intersect(y._2)))})

would it be possible to achive the desired newRDD?


Solution

  • The reason why you are getting 'Task not serilizable' exception is because you are trying to put an RDD in a map for an other RDD, in this case Spark would try to serialise the second RDD. Normally this kind of problem you'd solve with joins:

    val newRDD = rdd1.cartesian(rdd1).map { case ((a, aSet), (b, bSet)) =>
       (a, b, aSet.intersect(bSet))
    }
    

    Here the cartesian join creates a pair of each sets in a new RDD, which you can intersect.