Search code examples
performanceapache-sparkparallel-processinggoogle-cloud-dataproc

Spark partition on nodes foreachpartition


I have a spark cluster (DataProc) with a master and 4 workers (2 preemtible), in my code I have some thing like this:

    JavaRDD<Signal> rdd_data = javaSparkContext.parallelize(myArray);
rdd_data.foreachPartition(partitionOfRecords -> {
        while (partitionOfRecords.hasNext()) {
            MyData d = partitionOfRecords.next();
            LOG.info("my data: " + d.getId().toString());
        }
    })

myArray is composed by 1200 MyData objects. I don't understand why spark uses only 2 cores, divide my array into 2 partitions, and doesn't use 16 cores. I need to set the number of partition?

Thanks in advance for any help.


Solution

  • Generally it's always a good idea to specific the number of partitions as the second argument to parallelize since the optimal slicing of your dataset should really be independent from the particular shape of the cluster you're using, and Spark can at best use current sizes of executors as a "hint".

    What you're seeing here is that Spark will default to asking taskScheduler for current number of executor cores to use as the defaultParallelism, combined with the fact that in Dataproc Spark dynamic allocation is enabled. Dynamic allocation is important because otherwise a single job submitted to a cluster might just specify max executors even if it sits idle and then it will prevent other jobs from being able to use those idle resources.

    So on Dataproc, if you're using default n1-standard-4, Dataproc configures 2 executors per machine and gives each executor 2 cores. The value of spark.dynamicAllocation.minExecutors should be 1, so your default job, upon startup without doing any work, would sit on 1 executor with 2 cores. Then taskScheduler will report that 2 cores are currently reserved in total, and therefore defaultParallelism will be 2.

    If you had a large cluster and you were already running a job for awhile (say, you have a map phase that runs for longer than 60 seconds) you'd expect dynamic allocation to have taken all available resources, so the next step of the job that uses defaultParallelism would then presumably be 16, which is the total cores on your cluster (or possibly 14, if 2 are consumed by an appmaster).

    In practice, you probably want to parallelize into a larger number of partitions than total cores available anyways. Then if there's any skew in how long each element takes to process, you can have nice balancing where fast tasks finish and then those executors can start taking on new partitions while the slow ones are still running, instead of always having to wait for a single slowest partition to finish. It's common to choose a number of partitions anywhere from 2x the number of available cores to something 100x or more.

    Here's another related StackOverflow question: spark.default.parallelism for Parallelize RDD defaults to 2 for spark submit