Search code examples
scaladistributed-computingapache-spark

Apache Spark - How to zip multiple RDDs


Let's say I have a bunch of RDD's, maybe RDD[Int], and I have a function that defines an operation on a sequence of ints and returns an int, like a fold: f: Seq[Int] => Int.

If I have a sequence of RDD's, Seq[RDD[Int]], how do I apply the function and return a single new RDD with the resulting value? I don't seem to find a zipPartitions method in Spark which accomplishes this.


Solution

  • At some point the elements of the Seq[Int] need to be bound to the parameters of f. Whether this occurs beforehand by creating a collection ("materializing the lists") or by binding them one-by-one in a partial function application manner, at some point there needs to be a collection-like data structure that contains all of the elements. Certainly, once inside f, they all need to be in the same place.

    Here is a slightly more functional style implementation of Spiro's makeZip function:

    def makeZip(xs: ListBuffer[RDD[Double]]): RDD[ListBuffer[Double]] = {
      // initialize with arrays of length 1
      val init = xs(0).map { ListBuffer(_) } 
      // fold in remaining by appending to mutable list
      xs.drop(1).foldLeft(init) { 
        (rddS, rddXi) => rddS.zip(rddXi).map(sx => sx._1 += sx._2)
      }
    }