Search code examples
javascalaapache-sparkreturnrdd

Apache Spark Accumulable addInPlace requires return of R1? Or any value?


From the Spark source code for Accumulable is the addInPlace method to merge values of the same Accumulable from different partitions:

/**
 * Merge two accumulated values together. Is allowed to modify and return the first value
 * for efficiency (to avoid allocating objects).
 *
 * @param r1 one set of accumulated data
 * @param r2 another set of accumulated data
 * @return both data sets merged together
 */
def addInPlace(r1: R, r2: R): R

I am assuming I can return any value I want when I define addInPlace in my implementation of AccumulableParam. I am assuming whatever pointer I pass in as r1 will then point to whatever I return.

My boss thinks that the r1 passed in is the only thing allowed in a return statement. This is sounding Ann-Landers-ish, who is right?

There's a case where I just want to throw away r1 and replace it with the object in r2, which would be the new value of this merged accumulator.

Can I just return r2 or must I do a deep copy to r1 as my (much much more experience Java programming) boss thinks? To be clear, while Spark of course is written in Scala, I am writing a class implementing AccumulableParam in Java.


Solution

  • As a rule of thumb when performing fold-like operations you should never modify the second argument. We can illustrate why with a simple example. Lets assume we have simple accumulator like this:

    import org.apache.spark.AccumulatorParam
    import scala.collection.mutable.{Map => MMap}
    
    type ACC = MMap[String, Int]
    
    object DummyAccumulatorParam extends AccumulatorParam[ACC] {
      def zero(initialValue: ACC): ACC = {
        initialValue
      }
    
      def addInPlace(acc: ACC, v: ACC): ACC = {
        v("x") = acc.getOrElse("x", 0) +  v.getOrElse("x", 0)
        v
      }
    }
    

    It is particularly useful but it doesn't matter. Point is it modifies the second argument. Lets see if it works:

    val rdd = sc.parallelize(Seq(MMap("x" -> 1), MMap("x" -> 1), MMap("x" -> 1)), 1)
    
    val accum1 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
    rdd.foreach(x => accum1 += x)
    
    accum1.value
    // scala.collection.mutable.Map[String,Int] = Map(x -> 3)
    

    So far so good. We can even create another one and it sill works as expected:

    val accum2 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
    rdd.foreach(x => accum2 += x)
    
    accum2.value
    // scala.collection.mutable.Map[String,Int] = Map(x -> 3)
    

    Now lets cache the data:

    rdd.cache
    

    repeat the process:

    val accum3 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
    rdd.foreach(x => accum3 += x)
    
    val accum4 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
    rdd.foreach(x => accum4 += x)
    

    and check accumulator value:

    accum4.value
    // scala.collection.mutable.Map[String,Int] = Map(x -> 6)
    

    and RDD content:

    rdd.collect
    // Array[scala.collection.mutable.Map[String,Int]] = 
    //  Array(Map(x -> 1), Map(x -> 3), Map(x -> 6))
    

    So as you can see it is not safe to return or modify the second argument. It applies to other similar operations like fold or aggregate as well.