The foreach action on a partitioned RDD yields unpredictable results. Why does this happen?
For instance, I attempted to print doubled values using the foreach action on an RDD partitioned into two slices (numSlices=2). Here is the code:
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], numSlices=2)
print(numbers.collect())
def f(x):
print(x, x*2)
numbers.foreach(lambda x: f(x))
However, the output is inconsistent. Sometimes it correctly displays the pairs consisting of the original value x and 2x. But when I rerun the code, the output differs, as shown below. Should I avoid using foreach when working with partitioned RDDs?
[1, 2, 3, 4, 5, 6, 7, 8, 9]
15 102
62 124
73 146
84 168
9 18
Apache Spark is a parallel computing engine. It does not process partitions sequentially but in parallel.
print(x, x*2)
basically does the following actions sequentially: print(x)
, print(' ')
, print(x*2)
, print(\n)
. Now, let's analyze the first line of your result. Let's consider that 1
and 5
are in distinct partitions. If they are processed in parallel by two separate executors, the following sequence of actions could happen:
print(1) # executor_1
print(5) # executor_2
print(' ') # executor_1
print(' ') # executor_2
print(5*2) # executor_2
print(1*2) # executor_1
print(\n) # executor_2
print(\n) # executor_1
For both executors, the sequence of action is coherent but since the two sequences are entangled, it yields the strange output you witness:
15 102
Notice that if you force spark to handle everything on the same core, the output becomes more coherent:
>>> numbers.coalesce(1).foreach(lambda x: f(x))
1 2
2 4
3 6
4 8
5 10
6 12
7 14
8 16
9 18
10 20