I have 18
Spark executors that have each 102GB
memory and 26
cores, so around 1836GB
total memory and 468
total cores available for Spark, as you can see bellow :
I run the following App, that use all the resources :
As I don't do any cache/persist/broadcast, I have set the Spark Environnement as follow :
So basically :
--conf "spark.memory.fraction=0.6"
--conf "spark.memory.storageFraction=0.1"
The following Spark UI Executors tab is a bit misleading, but theorically, I have :
0.6 * 102GB = 61 GB, but let say 53 GB
for total storage and execution memory per executor18 * 53 GB = 970 GB
0.9 * 53 GB = 47 GB
for Spark execution memory, and 0.1 * 53 GB = 5 GB
for Spark storage memory.18 * 47 GB = 846 GB
As you can see on the previous screen :
Regarding the stages, the most important one is the last and active stage (that actively do Shuffle read, and produce output) :
Indeed, my app consist of a main large .join()
between two dataset, that trigger a full shuffle (534.9 GB
written at stage 11), and a large shuffle read at stage 16 (screenshot show shuffle read of 5.8 GB out of 543.9 GB)
We can zoom on such join thanks to the following Spark UI Spark SQL graph :
With some simple unix command, I can confirm that spark.local.dir
(used for temp files during shuffle) consume at most 600GB
including sum of all nodes. This number match the 534.9 GB
reported by the UI during the main shuffle write.
Question :
Why does spark spill to disk in that case ?
Indeed, Spark seems to use neither of the 846GB
execution memory, nor those 970GB
total Spark memory (including execution + storage) to store part of the Shuffle Write results, and speed up the compute.
Spark seems to spill all the data to disk. Indeed, around 600GB
is written on disk. 600GB
seems to refers to compressed data (as spark.shuffle.compress=true
).
I know that the uncompressed data might be about 3.9 TB
as the Spark UI suggest. But as it is Spark 3.0.2
that is supposed to support internally efficient Kyro serialization
, anyway, it still remain unclear why Spark spill everything in that case
From an AWS guide, https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/#:~:text=In%20Apache%20Spark%2C%20shuffling%20happens,which%20can%20cause%20straggling%20executors, but also to be found elsewhere.
Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel.
In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors.
That is to say, the architecture of Spark is to write Mapper output to local disk, for Reducer phase, tasks to consume. Join fits into that approach as well, obviously.