I am trying to understand the locality level on Spark cluster and its relationship with the RDD number of partitions along with the action perform on it. Specifically, I have a dataframe where the number of partitions are 9647. Then, I performed df.count
on it and observed the following in the Spark UI:
A bit of context, I submitted my job to Yarn cluster with the following configuration:
- executor_memory='10g',
- driver_memory='10g',
- num_executors='5',
- executor_cores=5'
Also, I noticed that all the executors were coming from 5 different nodes (hosts).
From the figure, I found that from all 9644 tasks, more than 95% were not run within the same node. So, I am just wondering the reason for having having a lot of rack_local. Specifically, why don't the node chose the closest data source to execute, in other words, having more node local?
Thank you
Here are some points to consider.
Below you can find some of the factors that affect the data locality in Spark:
getPreferredLocations
. Later on the TaskScheduler will leverage this information to decide about the locality of the task. In the definition of the RDD you can find the definition of the getPreferredLocations
which is responsible for specifying the optimal location of the RDD. For example, if the source is HDFS Spark will create an instance of HadoopRDD (or NewHadoopRDD) and it will access the Hadoop API to retrieve the information about the location of the source files overriding the getPreferredLocations
function from its base class.spark.locality.wait
to set the waiting time that the decision about the level of locality should be taken. Spark will use this setting to wait for a particular time for resources to become available. If after the expiration of the spark.locality.wait
interval there are no resources (cores) available on the node then Spark will downgrade the locality level eg: PROCESS_LOCAL -> NODE_LOCAL
the same will happen with the new downgraded level until the required resource specs are met. On the other side one way to upgrade the task locality is to add more resources eg: add a new executor. The tests found here (line 915) demonstrates this scenario. The default value is 3sec, if you think you should give more time to your tasks you might decide to increase this value although is not suggested (can increase inefficiently the Spark idle time).My final advice to improve the locality would be to make Spark aware of the location of the partitions by using repartition() + persist() or cache()
.
Note: that the persistence will take effect after the first call of an action.
Useful links:
https://www.waitingforcode.com/apache-spark/spark-data-locality/read