Search code examples
apache-sparkspark-streaming

Is it possible to create a mutable shared data structure without using accumulators in spark?


I am new to spark and there are somethings which are quite unclear to me. But basic knowledge dictates that only accumulators are mutable variables which can be updated across executors and its value can be retrieved by driver. Any other variables initialized in the code , which are updated across executors the updated values are not relayed back to the driver as they are separate JVM's.

I am working on part of a project which stores offsets from zookeeper in a data structure for future use. As the offsets are obtained on executors, it was almost impossible to have a shared data structure which will update offsets per partition back to the driver as well.That is until I came across this code in https://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html.

AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(rdd -> { 
    OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    offsetRanges.set(offsets);    return rdd;
}).map(
    ...
    ).foreachRDD(rdd -> {    for (OffsetRange o : offsetRanges.get()) {
        System.out.println(
            o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
        );}    
        ...
    });
System.out.println(Arrays.toString(offsetRanges.get()));

This contradicts the underlying theory as when I access the value of AtomicReference<OffsetRange[]> offsetRanges in my driver I get the correct updated value(as updated in the transformToPair method in the executor code ) even though it should return me a null or empty response. Please can someone explain me this behavior?


Solution

  • Is it possible to create a mutable shared data structure without using accumulators in spark?

    No.

    This contradicts the underlying theory as when I access the value of

    It doesn't, because the value is not modified outside driver. The closure of transformToPair is executed on the driver, not executors.

    Therefore offsetRanges.set(offsets) is executed on the same JVM where the original offsetRanges value lives.