From the Spark source code for Accumulable is the addInPlace method to merge values of the same Accumulable from different partitions:
/**
* Merge two accumulated values together. Is allowed to modify and return the first value
* for efficiency (to avoid allocating objects).
*
* @param r1 one set of accumulated data
* @param r2 another set of accumulated data
* @return both data sets merged together
*/
def addInPlace(r1: R, r2: R): R
I am assuming I can return any value I want when I define addInPlace in my implementation of AccumulableParam. I am assuming whatever pointer I pass in as r1 will then point to whatever I return.
My boss thinks that the r1 passed in is the only thing allowed in a return statement. This is sounding Ann-Landers-ish, who is right?
There's a case where I just want to throw away r1 and replace it with the object in r2, which would be the new value of this merged accumulator.
Can I just return r2 or must I do a deep copy to r1 as my (much much more experience Java programming) boss thinks? To be clear, while Spark of course is written in Scala, I am writing a class implementing AccumulableParam in Java.
As a rule of thumb when performing fold-like operations you should never modify the second argument. We can illustrate why with a simple example. Lets assume we have simple accumulator like this:
import org.apache.spark.AccumulatorParam
import scala.collection.mutable.{Map => MMap}
type ACC = MMap[String, Int]
object DummyAccumulatorParam extends AccumulatorParam[ACC] {
def zero(initialValue: ACC): ACC = {
initialValue
}
def addInPlace(acc: ACC, v: ACC): ACC = {
v("x") = acc.getOrElse("x", 0) + v.getOrElse("x", 0)
v
}
}
It is particularly useful but it doesn't matter. Point is it modifies the second argument. Lets see if it works:
val rdd = sc.parallelize(Seq(MMap("x" -> 1), MMap("x" -> 1), MMap("x" -> 1)), 1)
val accum1 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum1 += x)
accum1.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)
So far so good. We can even create another one and it sill works as expected:
val accum2 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum2 += x)
accum2.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 3)
Now lets cache the data:
rdd.cache
repeat the process:
val accum3 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum3 += x)
val accum4 = sc.accumulator(MMap("x" -> 0))(DummyAccumulatorParam)
rdd.foreach(x => accum4 += x)
and check accumulator value:
accum4.value
// scala.collection.mutable.Map[String,Int] = Map(x -> 6)
and RDD content:
rdd.collect
// Array[scala.collection.mutable.Map[String,Int]] =
// Array(Map(x -> 1), Map(x -> 3), Map(x -> 6))
So as you can see it is not safe to return or modify the second argument. It applies to other similar operations like fold
or aggregate
as well.