Search code examples
pythonapache-sparkpysparkmapreduce

Update global variables by using map reduce


Assume I have this in pyspark:

def condi( x ):
    if x["age"] <= 2:
        return True
    else:
        return False

def add_count( x ):
    global aa
    aa += 1
    x["count"] += 10000
    return x

sc = pyspark.SparkContext(  master = 'spark://192.168.56.103:7077',appName = 'test' )

data = [{"age":1,"count":10},{"age":2,"count":20},{"age":3,"count":30}]

data = sc.parallelize( data )

global aa
aa = 0

k = data.map( lambda x : add_count( x ) if condi( x ) else x )

print( k.collect() )
print( aa )

Output like this :

[{'count': 10010, 'age': 1}, {'count': 10020, 'age': 2}, {'count': 30, 'age': 3}] # data
0 # aa

The global variable aa doesn't modify at all.

How could I modify global variables by using map reduce ?


Solution

  • You need to declare aa as Accumulator, so it will be shared by all executors. Please use

    aa = sc.accumulator(0)
    

    instead of

    aa = 0
    

    After this change, the value printed out will be 2.

    Explanation: each executor uses its own local copy of variables. So adding +1 to one copy of aa on an executor does not change the value of aa on the driver. The statement print( aa ) is executed on the driver and therefore does not see the changes on the executors.

    You can also check this question.