Search code examples
dictionaryapache-sparkpysparkaccumulator

accumulator in pyspark with dict as global variable


Just for learning purpose, I tried to set a dictionary as a global variable in accumulator the add function works well, but I ran the code and put dictionary in the map function, it always return empty.

But similar code for setting list as a global variable

class DictParam(AccumulatorParam):
    def zero(self,  value = ""):
        return dict()

    def addInPlace(self, acc1, acc2):
        acc1.update(acc2)


if  __name__== "__main__":
    sc, sqlContext = init_spark("generate_score_summary", 40)
    rdd = sc.textFile('input')
    #print(rdd.take(5))



    dict1 = sc.accumulator({}, DictParam())


    def file_read(line):
        global dict1
        ls = re.split(',', line)
        dict1+={ls[0]:ls[1]}
        return line


    rdd = rdd.map(lambda x: file_read(x)).cache()
    print(dict1)

Solution

  • I believe that print(dict1()) simply gets executed before the rdd.map() does.

    In Spark, there are 2 types of operations:

    • transformations, that describe the future computation
    • and actions, that call for action, and actually trigger the execution

    Accumulators are updated only when some action is executed:

    Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action.

    If you check out the end of this section of the docs, there is an example exactly like yours:

    accum = sc.accumulator(0)
    def g(x):
        accum.add(x)
        return f(x)
    data.map(g)
    # Here, accum is still 0 because no actions have caused the `map` to be computed.
    

    So you would need to add some action, for instance:

    rdd = rdd.map(lambda x: file_read(x)).cache() # transformation
    foo = rdd.count() # action
    print(dict1)
    

    Please make sure to check on the details of various RDD functions and accumulator peculiarities because this might affect the correctness of your result. (For instance, rdd.take(n) will by default only scan one partition, not the entire dataset.)