Search code examples
apache-sparkcluster-computinghadoop-yarn

Count operation resulting in more rack_local pyspark


I am trying to understand the locality level on Spark cluster and its relationship with the RDD number of partitions along with the action perform on it. Specifically, I have a dataframe where the number of partitions are 9647. Then, I performed df.count on it and observed the following in the Spark UI:

enter image description here

A bit of context, I submitted my job to Yarn cluster with the following configuration:

- executor_memory='10g',
- driver_memory='10g',
- num_executors='5',
- executor_cores=5'

Also, I noticed that all the executors were coming from 5 different nodes (hosts).

From the figure, I found that from all 9644 tasks, more than 95% were not run within the same node. So, I am just wondering the reason for having having a lot of rack_local. Specifically, why don't the node chose the closest data source to execute, in other words, having more node local?

Thank you


Solution

  • Here are some points to consider.

    Below you can find some of the factors that affect the data locality in Spark:

    1. Initially Spark will try to place the task as close as possible to the node where the source data exists. For instance if the source system is HDFS, Spark will try to execute the task in the same node where the data of the specific partition exists. Spark will find the preferred location for each RDD by implementing the getPreferredLocations. Later on the TaskScheduler will leverage this information to decide about the locality of the task. In the definition of the RDD you can find the definition of the getPreferredLocations which is responsible for specifying the optimal location of the RDD. For example, if the source is HDFS Spark will create an instance of HadoopRDD (or NewHadoopRDD) and it will access the Hadoop API to retrieve the information about the location of the source files overriding the getPreferredLocations function from its base class.
    2. The main reason not to be able to achieve a high locality eg: PROCESS_LOCAL or NODE_LOCAL is the lack of resources in the target node. Spark uses the setting spark.locality.wait to set the waiting time that the decision about the level of locality should be taken. Spark will use this setting to wait for a particular time for resources to become available. If after the expiration of the spark.locality.wait interval there are no resources (cores) available on the node then Spark will downgrade the locality level eg: PROCESS_LOCAL -> NODE_LOCAL the same will happen with the new downgraded level until the required resource specs are met. On the other side one way to upgrade the task locality is to add more resources eg: add a new executor. The tests found here (line 915) demonstrates this scenario. The default value is 3sec, if you think you should give more time to your tasks you might decide to increase this value although is not suggested (can increase inefficiently the Spark idle time).
    3. In the case that your data lives outside of Spark cluster then locality level will be set to ANY.

    My final advice to improve the locality would be to make Spark aware of the location of the partitions by using repartition() + persist() or cache().

    Note: that the persistence will take effect after the first call of an action.

    Useful links:

    https://www.waitingforcode.com/apache-spark/spark-data-locality/read

    http://www.russellspitzer.com/2017/09/01/Spark-Locality/

    https://github.com/apache/spark/blob/0bb716bac38488bc216fbda29ce54e93751e641b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala