Search code examples
apache-sparkaccumulatoraggregator

Spark Accumulator in custom Aggregator


I have a custom aggregator and in the reduce method I would like to use an accumulator to do some statistics.

How should I pass the accumulator to the aggregator?

Do I have to pass the accumulator as a constructor param or do i have to use the AccumulatorContext.get(0)?


Solution

  • Create the accumulator outside of your aggregator (and outside of any other code that will be running on an executor node) and use it within the aggregator. The variable itself can be passed around like any other normal variable.

    val ds = sc.parallelize(1 to 5).toDS()
    val acc = sc.longAccumulator
    val mySumAgg = new Aggregator[Int, Int, Int] {
      def reduce(b: Int, a: Int): Int = {
        acc.add(1)
        a + b
      }      
      [...]
    }.toColumn
    
    ds.groupByKey( i => i)
      .agg(mySumAgg)
      .show()
    
    print("Merge has been called " + acc.value + " times")
    

    If you have created a separate class for your aggregator, you can pass the accumulator via a constructor or you might use a setter.

    You should not use AccumulatorContext, as the docs say:

    An internal class used to track accumulators by Spark itself.