I want to get the number of closed orders from this data using accumulators. But it is giving me incorrect answer, just zero(0). What is the problem? I am using Hortonworks Sandbox. The code is below. I am using spark-submit.
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc = SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(N_closed, line):
status =(line.split(",")[-1]=="CLOSED")
if status:
N_closed.add(1)
return status
closedRDD = rdd.filter(lambda x: is_closed(N_closed, x))
print('The answer is ' + str(N_closed.value))
But when I submit it, I get zero.
spark-submit --master yarn closedCounter.py
Now, when I change my code it works fine. Is this the right way to do it?
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('closedcount')
sc = SparkContext(conf=conf)
rdd = sc.textFile("/tmp/fish/itversity/retail_db/orders/")
N_closed = sc.accumulator(0)
def is_closed(line):
global N_closed
status =(line.split(",")[-1]=="CLOSED")
if status:
N_closed.add(1)
rdd.foreach(is_closed)
print('The answer is ' + str(N_closed.value))
I understand it now, In Jupyter Notebook, without Yarn, it gives me the correct answer because I have called an action (count) before checking the value from the accumulator.
Computations inside transformations are evaluated lazily, so unless an action happens on an RDD the transformationsare not executed. As a result of this, accumulators used inside functions like map() or filter() wont get executed unless some action happen on the RDD
https://www.edureka.co/blog/spark-accumulators-explained
(Examples in Scala)
But basically, you need to perform an action on rdd
.
For example
N_closed = sc.accumulator(0)
def is_closed(line):
status = line.split(",")[-1]=="CLOSED"
if status:
N_closed.add(1)
return status
rdd.foreach(is_closed)
print('The answer is ' + str(N_closed.value))