Search code examples
scalaapache-sparkdistributed-computing

Serialising temp collections created in Spark executors during task execution


I'm trying to find an effective way of writing collections created inside tasks to the output files of the job. For example, if we iterate over a RDD using foreach, we can create data structures that are local to the executor ex.,ListBuffer arr in the following code snippet. My problem is that how do I serialise arr and write it to file? (1) Should I use FileWriter api or Spark saveAsTextFile will work? (2) What will be the advantages of using one over the other (3) Is there a better way of achieving the same.

PS: The reason I am using foreach instead of map is because I might not be able to transform all my RDD rows and I want to avoid getting Null values in the output.

val dataSorted: RDD[(Int, Int)] = <Some Operation>
val arr: ListBuffer = ListBuffer[(String, String)]()
dataSorted.foreach {
     case (e, r) => {
     if(e.id > 1000) {
       arr += (("a", "b"))
     }
  }
}

Thanks, Devj


Solution

  • You should not use driver's variables, but Accumulators - therw are articles about them with code examples here and here, also this question maybe helpful - there is simplified code example of custom AccumulatorParam

    Write your own accumulator, that is able to add (String, String) or use built-in CollectionAccumulator. This is implementation of AccumulatorV2, new version of accumulator from Spark 2

    Other way is to use Spark built-in filter and map functions - thanks @ImDarrenG for suggesting flatMap, but I think filter and map will be easier:

    val result : Array[(String, String)] = someRDD
        .filter(x => x._1 > 1000) // filter only good rows
        .map (x => ("a", "b"))
        .collect() // convert to arrat