Search code examples
scalaapache-sparkhadoop-yarn

Why does spark spin more executors than available cores on the machine?


spark should have spinned 6 containers instead of 9 below

I often see that spark spins more executors than it should on a given node. This is spiking the load average on the node which eventually results in executor lost or bad node or unhealthy node

My spark configuration is as follows

  1. each node: 8cores, 32GB; yarn. node manager.resource.cpu-vcores on master, core, and task nodes is set to 6. Meaning at most 6 executors should run on a node.
  2. Few jobs run with 2cores, 8GB; and few with 1core 2GB; --meaning Even in the worst case one node should at most spin 6 containers. However (above image) I see that 9 containers are spun.
  3. On ganglia I see that the nodes are at 19 load average (while the node has only 8vcores) --because of this high load average the executors are getting lost (the driver can't get heartbeats on time; executors run slowly because of high load averages)
  4. We are using EMR 6.9; Spark 3.2.0

Questions:

  1. Why did Spark spin more executors than configured?
  2. Is it possible to limit the max executors to 6 in this case?
  3. On another node 5 cores and 25GB is available, yet why didn't spark/ yarn choose to spin executors there?

spark could have spinned new executors here instead

My startup configuration JSON file to AWS-EMR is as follows. This was given to the node configurations for master, task, and core nodes (same JSON to all).

I added high values to heartbeats etc. only to make sure executors are not lost. A job runs for 1-1.5 hrs; if only the executor can hang on for a while the job is completed, or else all 1.5 hrs effort is wasted.

[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.resource.cpu-vcores": "6",
      "yarn.nodemanager.resource.memory-mb": "30000",
      "yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms":"60000",
      "yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms":"60000",
      "yarn.resourcemanager.nodemanagers.heartbeat-interval-ms":"60000",
      "yarn.nodemanager.health-checker.timeout-ms":"72000000",
      "yarn.nodemanager.health-checker.interval-ms":"36000000",
      "yarn.resourcemanager.application-timeouts.monitor.interval-ms":"180000"
    }
  },
  {
    "Classification": "mapred-site",
    "Properties": {
      "yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms":"60000",
      "yarn.app.mapreduce.am.hard-kill-timeout-ms":"600000"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.executor.heartbeatInterval": "600s",
      "spark.network.timeout":"7200s"
    }
  }
]

Solution

  • I was finally able to find the fix. Apparently capacity-scheduler of yarn has bug which does over allocation of executors when there is adequate memory for new executor. Now, this can be fixed by setting the flag

    "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
    

    in /capacity-scheduler.conf

    In my case because I am using EMR the below configuration helped.

    [
      {
        "Classification": "yarn-site",
        "Properties": {
          "yarn.nodemanager.resource.cpu-vcores": "5"
        }
      },
      {
        "Classification": "capacity-scheduler",
        "Properties": {
          "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
      }
    ]
    

    With the above JSON, I verified and now Yarn doesn't allocate more than configured cores on the node to the executors.

    steps: ( new emr console > select cluster > configurations > instance group configurations > select master radio > reconfigure > and then give the above json > save )