Search code examples
pythonpython-3.xpysparkaccumulator

NameError: name 'acc' is not defined in pyspark accumulator


Test Accumulator in pyspark but it went wrong:

def test():
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf).getOrCreate()
    rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
                            Row(user="hadoop", item="book"), Row(user="python", item="duck")])

    acc = sc.accumulator(0)
    print("accumulator: {}".format(acc))

    def imap(row):
        global acc
        acc += 1
        return row

    rdds.map(imap).foreach(print)
    print(acc.value)

The error is:

...
return f(*args, **kwargs)
File "test_als1.py", line 205, in imap
acc += 1
NameError: name 'acc' is not defined

But I set the acc as global variable, how can I write the code?


Solution

  • The problem is that imap is referencing a global variable that doesn't exist (the assignment in test only creates a local variable in that function). This simple program (without Spark) fails with the same error for the same reason:

    def foo():
        blah = 1
        def bar():
            global blah
            print(blah)
        bar()
    
    
    if __name__ == '__main__':
        foo()
    

    Assigning acc at the module level works:

    if __name__ == '__main__':
        conf = SparkConf().setAppName("test").setMaster("local[*]")
        sc = SparkContext(conf=conf).getOrCreate()
        rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
                               Row(user="hadoop", item="book"), Row(user="python", item="duck")])
    
        acc = sc.accumulator(0)
        print("accumulator: {}".format(acc))
    
        def imap(row):
            global acc
            acc += 1
            return row
    
        rdds.map(imap).foreach(print)
        print(acc.value)
    

    Adding a global acc statement to test is an alternative if you need to keep the function test.