Search code examples
apache-sparkjava-8accumulator

How to intercept partial updates to accumulators on driver?


Spark 1.5.1 + Java 1.8

We are using spark to upload a solid ammount of records to database.

The Action code looks like this:

rdd.foreachPartition(new VoidFunction<Iterator<T>>() {

     @Override
     public void call(Iterator<T> iter) {
          //while there are more records perform the following every 1000 records
          //int[] recoords = statement.executeBatch();
          //accumulator.add(recoords.length);
     }
     // ...
} 

On the driver node there is a thread that monitors the accumulator value. However the value does not get updated. It does get updated only once, by the time applications ends. Even if accumulators used lazy value setting, it should be updated correctly since I am reading the value periodically in the driver node thread.

Am I using the accumulator incorrectly? Is there anyway I can more continuously monitor progress from my workers?


Solution

  • You can monitor accumulator value but it cannot be done continuously, i.e. while the updates happen after tasks have finished.

    Although accumulators are called shared variables there are not really shared. Each task gets its own accumulator which is merged after the task is finished. It means that global values cannot be updated while the task is running.

    To be able to see the updates the number of executors has to less than the number of processed partitions (that corresponds to the number of tasks). The reason for this is to introduce a "barrier" when the accumulator updates get sent to the driver.

    For example:

    import org.apache.spark.{SparkConf, SparkContext}
    
    object App {
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local[4]")
        val sc = new SparkContext(conf)
    
        val accum = sc.accumulator(0, "An Accumulator")
        val rdd = sc.parallelize(1 to 1000, 20)
    
        import scala.concurrent.duration._
        import scala.language.postfixOps
        import rx.lang.scala._
    
        val o = Observable.interval(1000 millis).take(1000)
        val s = o.subscribe(_ => println(accum.value))
        rdd.foreach(x => {
          Thread.sleep(x + 200)
          accum += 1
        })
        s.unsubscribe
        sc.stop
      }
    }
    

    As you can see global value is updated only once per task.

    If you create named accumulator as in provided example you can monitor it status using Spark UI as well. Just open Stages tab, navigate to the specific stage and check accumulators section.

    Is there anyway I can more continuously monitor progress from my workers?

    The most reliable approach is to increase granularity by adding more partitions but it doesn't come cheaper.