Search code examples
apache-sparkpysparkaccumulator

Pyspark custom accumulators


I was trying to run this program as an example of custom accumulators in pyspark. I am getting the error 'int is not iterable'. I cannot resolve this. Can someone please help me with this.

import findspark
findspark.init()
from pyspark import AccumulatorParam, SparkContext
sc = SparkContext('local','local')

rdd = sc.parallelize(xrange(10))

class SAP(AccumulatorParam):
    def zero(self, initialValue):
        s=set()
        s.add(initialValue)
        return s
    def addInPlace(self, v1, v2):

        return v1.union(v2)



ids_seen = sc.accumulator(0, SAP())
def inc(x):
    global ids_seen
    ids_seen += x
    return x

rdd.foreach(inc)

Solution

  • In terms of types addInPlace is (R, R) => R and zero is (R) => R.

    The initial value should be of the same type as the type you expect in the accumulator so you have to initialize Accumulator with set:

    ids_seen = sc.accumulator(set(), SAP())
    

    or

    ids_seen = sc.accumulator({0}, SAP())
    

    and zero should be:

    def zero(self, initialValue):
        return initialValue.copy()
    

    Finally inc should add a set:

    def inc(x):
        global ids_seen
        ids_seen += {x}
        return x