Search code examples
javamultithreadingapache-sparkconcurrencyhadoop-yarn

How many concurrent tasks in one executor and how Spark handles multithreading among tasks in one executor?


In Spark, how many tasks are executed in parallel at a time? Discussions are found in How are stages split into tasks in Spark? and How DAG works under the covers in RDD?

But I do not find clear conclusion there.

Consider the following scenarios (assume spark.task.cpus = 1, and ignore vcore concept for simplicity):

  • 10 executors (2 cores/executor), 10 partitions => I think the number of concurrent tasks at a time is 10
  • 10 executors (2 cores/executor), 2 partitions => I think the number of concurrent tasks at a time is 2
  • 10 executors (2 cores/executor), 20 partitions => I think the number of concurrent tasks at a time is 20
  • 10 executors (1 cores/executor), 20 partitions => I think the number of concurrent tasks at a time is 10

Am I correct? Regarding the 3rd case, will it be 20 considering multi-threading (i.e. 2 threads because there are 2 cores) inside one executor?


UPDATE1

If the 3rd case is correct, it means:

  • when idle cores inside an executor are available, Spark could automatically decide to trigger multithreads in that executor
  • when there is only one core in the executor, multithread won't happen in that executor.

If this is true, isn't the behavior of Spark in an executor a bit uncertain (single thread v.s. multithread)?

Note that the code that is shipped from driver to the executors may not have considered automicity problem using e.g. synchronized keyword.

How is this handled by Spark?


Solution

  • I think all the 4 cases are correct, and the 4th case makes sense in reality ("overbook" cores). We should normally consider a factor of 2 to 4 for the nb. of partitions, i.e. nb. of partitions equals to 2 to 4 times of nb. of total cpu cores in the cluster.

    Regarding threading, 2 tasks in one executor running concurrently should not have issues regarding multi-threading, as each task is handling its own set of RDD.

    If spark.task.cpus = 2 is set, which means 2 cpu cores per task, then IMO there might be race condition problem (if there're var), but usually we are handling immutable values like RDD, so there should merely have issues either.