I am using Databricks to run a Spark cluster (v3.0.1) using Scala (v2.12). I have Scala files compiled as JAR and I am running the job using spark-submit
from Databricks UI.
The logic of the program starts with creating a list of random seeds and parallelizing it using the line:
val myListRdd = sc.parallelize(myList, partitions)
Next, I wish to run a processing function f(...args)
on this RDD, with one of the args
being the individual elements of myListRdd
. The function has a return type of Array[Array[Double]]
. So in Scala it would look like:
val result = myListRdd.map(f(_, ...<more-args>))
Now, I wish to efficiently collect the output array-of-arrays, using the following logic.
Example outputs from f(...args)
:
Output 1: ((1.0, 1.1, 1.2), (1.3, 1.4, 1.5), ...)
Output 2: ((2.0, 2.1, 2.2), (2.3, 2.4, 2.5), ...)
Output 3: ((3.0, 3.1, 3.2), (3.3, 3.4, 3.5), ...)
... so on
Now, as these are the multiple outputs from f(..args)
, I would like the final output using some spark RDD operations, to look like:
Type: Array[Array[Double]]
Value: ((1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2, ...), (1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5, ...), ...)
I am new to Spark and Scala, so I am not able to map my logic to code. I attempted to use flatMap
instead of map
in above snippet, but it does not give me the output exactly I want. If I attempt to convert the output RDD to dataframe using collect
operation, then it incurs lot of time in executing the job, and still I would need to run concatenate function over the dataframe.
If you have multiple instances of a type (in this case Array[Array[Double]]
) and you need to combine them into a single instance of that type, then you probably want to fold()
(or possibly reduce()
).
List(output1,output2,output3)
.foldLeft(Array.empty[Array[Double]]){
case (acc, aad) => aad.indices.map{idx =>
acc.lift(idx).fold(aad(idx))(_ ++ aad(idx))
}.toArray
}
//res0: Array[Array[Double]] =
// Array(Array(1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2)
// , Array(1.3, 1.4, 1.5, 2.3, 2.4, 2.5, 3.3, 3.4, 3.5))