Search code examples
pysparkaccumulator

Count number of elements in a text or list using Spark


I know there are different ways to count number of elements in a text or list. But I am trying to understand why this one does not work. I am trying to write an equivalent code to

A_RDD=sc.parallelize(['a', 1.2, []])

acc = sc.accumulator(0)
acc.value
A_RDD.foreach(lambda _: acc.add(1))
acc.value

Where the result is 3. To do so I defined the following function called my_count(_), but I don't know how to get the result. A_RDD.foreach(my_count) does not do anything. I didn't receive any error either. What did I do wrong?

counter = 0   #function that counts elements
def my_count(_):
    global counter
    counter += 1

A_RDD.foreach(my_count) 

Solution

  • The A_RDD.foreach(my_count) operation doesnt run on your local Python Virtual machine. It runs in your remote executor node. So the drives ships your my_count method to each of the executor nodes along with variable counter since the method refers the variable. So each executor nodes gets its own definition of counter variable which is updated by the foreach method while the counter variable defined in your driver application is not incremented.

    One easy but risky solution would be to collect the RDD on your driver and then compute the count like below. This is risky because the entire RDD content is downloaded to the memory of the driver which may cause MemoryError.

    >>> len(A_RDD.collect())
    3