Search code examples
apache-sparkpysparkaccumulator

PySpark accumulator that computes maximum value


What if we need to have value of accumulator set to be just max number out of all values returned by all task/nodes?

Example :

  • accumulator a

    • node1 sets:5
    • node2 sets:6
    • node3 sets:4

As 6 is larger than 4, the final value of accumulator should be 6.


Solution

  • You'll have to define an AccumulatorParam like this one:

    from pyspark import AccumulatorParam
    
    class MaxAccumulatorParam(AccumulatorParam):
        def zero(self, initialValue):
            return initialValue
        def addInPlace(self, v1, v2):
            return max(v1, v2)
    

    which can be use as shown below:

    acc = spark.sparkContext.accumulator(float("-inf"), MaxAccumulatorParam())
    rdd = sc.parallelize([5, 6, 4], 3)
    
    acc.value
    # -inf
    
    rdd.foreach(lambda x: acc.add(x))
    acc.value
    # 6