Test Accumulator in pyspark but it went wrong:
def test():
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf).getOrCreate()
rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
Row(user="hadoop", item="book"), Row(user="python", item="duck")])
acc = sc.accumulator(0)
print("accumulator: {}".format(acc))
def imap(row):
global acc
acc += 1
return row
rdds.map(imap).foreach(print)
print(acc.value)
The error is:
...
return f(*args, **kwargs)
File "test_als1.py", line 205, in imap
acc += 1
NameError: name 'acc' is not defined
But I set the acc
as global variable, how can I write the code?
The problem is that imap
is referencing a global variable that doesn't exist (the assignment in test
only creates a local variable in that function). This simple program (without Spark) fails with the same error for the same reason:
def foo():
blah = 1
def bar():
global blah
print(blah)
bar()
if __name__ == '__main__':
foo()
Assigning acc
at the module level works:
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf).getOrCreate()
rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
Row(user="hadoop", item="book"), Row(user="python", item="duck")])
acc = sc.accumulator(0)
print("accumulator: {}".format(acc))
def imap(row):
global acc
acc += 1
return row
rdds.map(imap).foreach(print)
print(acc.value)
Adding a global acc
statement to test
is an alternative if you need to keep the function test
.