I was trying to run this program as an example of custom accumulators in pyspark. I am getting the error 'int is not iterable'. I cannot resolve this. Can someone please help me with this.
import findspark
findspark.init()
from pyspark import AccumulatorParam, SparkContext
sc = SparkContext('local','local')
rdd = sc.parallelize(xrange(10))
class SAP(AccumulatorParam):
def zero(self, initialValue):
s=set()
s.add(initialValue)
return s
def addInPlace(self, v1, v2):
return v1.union(v2)
ids_seen = sc.accumulator(0, SAP())
def inc(x):
global ids_seen
ids_seen += x
return x
rdd.foreach(inc)
In terms of types addInPlace
is (R, R) => R
and zero
is (R) => R
.
The initial value should be of the same type as the type you expect in the accumulator so you have to initialize Accumulator
with set
:
ids_seen = sc.accumulator(set(), SAP())
or
ids_seen = sc.accumulator({0}, SAP())
and zero
should be:
def zero(self, initialValue):
return initialValue.copy()
Finally inc
should add a set
:
def inc(x):
global ids_seen
ids_seen += {x}
return x