As a newbie to Spark, I have been looking at their python example for estimation of PI.
I am interested to understand Spark's performance by re-estimating PI several times within the same context.
What I am observe is that the value of PI is unchanged across these re-estimations, and the performance timings seem to indicate that intermediate RDDs are being implicitly cached and then reused in the subsequent calculations.
Is there any way to configure Spark to control this behaviour, and that the intermediate RDDs are always regenerated? Using unpersist() seems to have no effect.
My code that produces this issue is here on github, execute by calling
`spark-submit pi2.py`
to get the following results:
No caching-0: 8000 generated 6256 in 1.14984297752 secs (PI = 3.128)
No caching-1: 8000 generated 6256 in 0.0597329139709 secs (PI = 3.128)
No caching-2: 8000 generated 6256 in 0.0577840805054 secs (PI = 3.128)
No caching-3: 8000 generated 6256 in 0.0545349121094 secs (PI = 3.128)
No caching-4: 8000 generated 6256 in 0.0544559955597 secs (PI = 3.128)
With caching-0: 8000 generated 6256 in 0.069139957428 secs (PI = 3.128)
With caching-1: 8000 generated 6256 in 0.0549170970917 secs (PI = 3.128)
With caching-2: 8000 generated 6256 in 0.0531771183014 secs (PI = 3.128)
With caching-3: 8000 generated 6256 in 0.0502359867096 secs (PI = 3.128)
With caching-4: 8000 generated 6256 in 0.0557379722595 secs (PI = 3.128)`
There are a few things happening here. First, you are in fact not caching the RDD. From your Github link:
# Now persist the intermediate result
sc.parallelize(xrange(1, n + 1), partitions).map(f).persist()
This creates a new RDD, does a map, then persists the resulting RDD. You don't keep a reference to it, so it's effectively gone now.
Next, the first run is likely slower because Spark will broadcast your functions to your workers. So there is some caching at work but not for data but for your code.
Lastly, the randomness: seed()
seeds your RNG in your driver. The seed value gets broadcast on the first run to all workers along with f()
(since the seed is referenced inside random()
). When you call seed()
again now, then it changes the seed in the driver but not in the versions of the function that have been already sent to the workers, so you get the same results over and over again.