Search code examples
scalaapache-sparkrdd

Efficiently concatenate array of arrays RDD by index of inner array


I am using Databricks to run a Spark cluster (v3.0.1) using Scala (v2.12). I have Scala files compiled as JAR and I am running the job using spark-submit from Databricks UI.

The logic of the program starts with creating a list of random seeds and parallelizing it using the line:

val myListRdd = sc.parallelize(myList, partitions)

Next, I wish to run a processing function f(...args) on this RDD, with one of the args being the individual elements of myListRdd. The function has a return type of Array[Array[Double]]. So in Scala it would look like:

val result = myListRdd.map(f(_, ...<more-args>))

Now, I wish to efficiently collect the output array-of-arrays, using the following logic.

Example outputs from f(...args):

Output 1: ((1.0, 1.1, 1.2), (1.3, 1.4, 1.5), ...)
Output 2: ((2.0, 2.1, 2.2), (2.3, 2.4, 2.5), ...)
Output 3: ((3.0, 3.1, 3.2), (3.3, 3.4, 3.5), ...)
... so on

Now, as these are the multiple outputs from f(..args), I would like the final output using some spark RDD operations, to look like:

Type: Array[Array[Double]]
Value: ((1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2, ...), (1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5, ...), ...)

I am new to Spark and Scala, so I am not able to map my logic to code. I attempted to use flatMap instead of map in above snippet, but it does not give me the output exactly I want. If I attempt to convert the output RDD to dataframe using collect operation, then it incurs lot of time in executing the job, and still I would need to run concatenate function over the dataframe.


Solution

  • If you have multiple instances of a type (in this case Array[Array[Double]]) and you need to combine them into a single instance of that type, then you probably want to fold() (or possibly reduce()).

    List(output1,output2,output3)
      .foldLeft(Array.empty[Array[Double]]){
        case (acc, aad) => aad.indices.map{idx =>
          acc.lift(idx).fold(aad(idx))(_ ++ aad(idx))
        }.toArray
      }
    //res0: Array[Array[Double]] =
    // Array(Array(1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2)
    //     , Array(1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5))