Search code examples
apache-sparkpysparkbigdatarddapache-spark-dataset

How does RDD.aggregate() work with partitions?


I'm new to spark and trying to understand how functions like reduce, aggregate etc. work. While going through RDD.aggregate(), I tried changing the zeroValue to something other than identities (0 for addition and 1 for multiplication) to see how the insides work.

This is what I tried:

# with identity zeroValue : (0, 0)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
l = sc.parallelize([1, 2, 3, 4])
print(l.aggregate((0, 0), seqOp, combOp))
>>> (10, 4)

# with a different zeroValue : (3, 5)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
l = sc.parallelize([1, 2, 3, 4])
print(l.aggregate((3, 5), seqOp, combOp))
>>> (49, 69)

The 2nd output was something I didn't expect. I was expecting it to be (13, 9) considering I just changed (0, 0) to (3, 5) and the numbers at corresponding positions would add up.

I figured it might have something to do with partitioning. Turns out our RDD had 12 partitions! So, going by the documentation, the zeroValue will become the initial value of x for every partition. when the partitions are combined, every blank partition will have an accumulated value of (3, 5). Since there are 12 partitions, considering 11 of them to be empty, the result should be:

(13 + 3 * 11, 9 + 5 * 11) = (46, 64)

Still not what we got. But we can see that our calculations are just short of ONE (3, 5) from becoming the actual result.

{(46 + 3, 64 + 5) = (49, 69)}

I reduced the number of partitions to 1 (to make sure that all the data is in the same partition and no partition is empty) and performed the same operation. Still got the same anomaly in th result.

print(l.coalesce(1).aggregate((3, 5), seqOp, combOp))
>>> (16, 14)

I definitely expected (13, 9) this time since there is only 1 partition. But it still fell short of ONE (3, 5) from the actual result

{(13 + 3, 9 + 5) = (16 , 14)}

Why is it that the result from RDD.aggregate() has one extra zeroValue than it should for the no. of partitions the RDD has? Also, at what point does this extra zeroValue get added to the accumulated value?


Solution

  • You correctly deduced that the number of partitions was responsible for the change in the result.

    If you try running the following snippet, you'll arrive at the value (13, 9) for the accumulator.

    seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
    zeroValue = (3, 5)
    acc = zeroValue
    for num in [1, 2, 3, 4]:
        acc = seqOp(acc, num)
    acc
    

    Seems plausible, doesn't it? Each element of the list will play the role of y in seqOp, and so the first element of acc will be equivalent to zeroValue[0] + sum([1, 2, 3, 4]), which is 13. At the same time, at each step, zeroValue[1] will be incremented by 1, resulting in zeroValues[1] + len([1, 2, 3, 4]), which is 9. Notice, however, that we didn't even need to define combOp. Now, let's pretend we only had a single partition. Our end result would be the result of reducing this result with a starting value of... zeroValue with a function that aggregates our result. This operation is, naturally, combOp, which we defined as (x[0] + y[0], x[1] + y[1]).

    The reason we get the behavior you described when working with multiple partitions is that the reduce part comes after a .collect() operation in which all results are sent to Spark's driver. Each empty partition had only zeroValue, since nothing was accumulated, while the nonempty partition sent the result of the seqOp. Then, the driver used combOp to reduce this result further to (49, 69).