I'm trying to find an effective way of writing collections created inside tasks to the output files of the job. For example, if we iterate over a RDD using foreach
, we can create data structures that are local to the executor ex.,ListBuffer arr
in the following code snippet. My problem is that how do I serialise arr
and write it to file?
(1) Should I use FileWriter
api or Spark saveAsTextFile
will work?
(2) What will be the advantages of using one over the other
(3) Is there a better way of achieving the same.
PS: The reason I am using foreach
instead of map
is because I might not be able to transform all my RDD rows and I want to avoid getting Null values in the output.
val dataSorted: RDD[(Int, Int)] = <Some Operation>
val arr: ListBuffer = ListBuffer[(String, String)]()
dataSorted.foreach {
case (e, r) => {
if(e.id > 1000) {
arr += (("a", "b"))
}
}
}
Thanks, Devj
You should not use driver's variables, but Accumulators - therw are articles about them with code examples here and here, also this question maybe helpful - there is simplified code example of custom AccumulatorParam
Write your own accumulator, that is able to add (String, String)
or use built-in CollectionAccumulator
. This is implementation of AccumulatorV2, new version of accumulator from Spark 2
Other way is to use Spark built-in filter and map functions - thanks @ImDarrenG for suggesting flatMap, but I think filter and map will be easier:
val result : Array[(String, String)] = someRDD
.filter(x => x._1 > 1000) // filter only good rows
.map (x => ("a", "b"))
.collect() // convert to arrat