Search code examples
apache-sparkpysparkaccumulatorhortonworks-sandbox

Spark Accumulator not working


I want to get the number of closed orders from this data using accumulators. But it is giving me incorrect answer, just zero(0). What is the problem? I am using Hortonworks Sandbox. The code is below. I am using spark-submit.

from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc   =  SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(N_closed, line):
  status =(line.split(",")[-1]=="CLOSED")
  if status:
    N_closed.add(1)
  return status
closedRDD = rdd.filter(lambda x: is_closed(N_closed, x))
print('The answer is ' + str(N_closed.value))

But when I submit it, I get zero.

 spark-submit --master yarn closedCounter.py

enter image description here

UpDate:

Now, when I change my code it works fine. Is this the right way to do it?

from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc   =  SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(line):
  global N_closed
  status =(line.split(",")[-1]=="CLOSED")
  if status:
    N_closed.add(1)

rdd.foreach(is_closed)
print('The answer is ' + str(N_closed.value))

enter image description here

Second Update:

I understand it now, In Jupyter Notebook, without Yarn, it gives me the correct answer because I have called an action (count) before checking the value from the accumulator. enter image description here


Solution

  • Computations inside transformations are evaluated lazily, so unless an action happens on an RDD the transformationsare not executed. As a result of this, accumulators used inside functions like map() or filter() wont get executed unless some action happen on the RDD

    https://www.edureka.co/blog/spark-accumulators-explained

    (Examples in Scala)

    But basically, you need to perform an action on rdd.

    For example

    N_closed = sc.accumulator(0)
    def is_closed(line):
        status = line.split(",")[-1]=="CLOSED"
        if status:
            N_closed.add(1)
        return status
    
    rdd.foreach(is_closed)
    print('The answer is ' + str(N_closed.value))