Search code examples
apache-sparkpyspark

Role of dynamicAllocation and executorAllocationRatio


I'm trying to understand the role of executorAllocationRatio. If we have below spark configurations set for a job running on a cluster,

"spark.dynamicAllocation.enabled": "true", 
"spark.dynamicAllocation.executorAllocationRatio": "0.4"
"spark.executor.cores": "5"
"spark.dynamicAllocation.minExecutors": "6"
"spark.dynamicAllocation.maxExecutors": "30"
"spark.executor.instances": "30" 

Behaviour with above configs:

  • what is the minimum number of executor and cores allocated to a job to start with? Is it 6 executors , 5 cores/executor (ie total 6*5=30 cores) or executorAllocationRatio has any role to play with?
  • when we dynamicAllocation is enabled, does it impact on cores? ie. is 5 the minimum cores/executor or cores/executor can also be lower than 5?

Solution

  • On an unconstrained cluster, the answer to the first part ("what is the minimum number of executor and cores allocated to a job to start with?") will be 30 executors, 5 cores each = 150 cores. executorAllocationRatio won't have bearing on this number. However, if resources are scarce, you may see your application start scheduling before the whole initial demand is satisfied, based on spark.scheduler.maxRegisteredResourcesWaitingTime and spark.scheduler.minRegisteredResourcesRatio settings as described in Spark Config - Scheduling section.

    Second part of your question is simple -- 5 is neither minimum nor maximum, its the exact number of cores allocated for each executor.

    spark.dynamicAllocation.executorAllocationRatio=1 (default) means that Spark will try to allocate P executors = 1.0 * N tasks / T cores to process N pending tasks. The ratio X less than 1.0 will lower the number of requested executors by the factor of X. This setting does not affect or consider runtime resource availability. The other way to look at it is that by default Spark will request new executor for every pending task staying in the queue for the duration of spark.dynamicAllocation.schedulerBacklogTimeout or spark.dynamicAllocation.sustainedSchedulerBacklogTimeout. The ratio X < 1 will allow it to request new executor for every 1/X tasks.