With Adaptive Query Execution in Spark 3+ , can we say that, we don't need to set spark.sql.shuffle.partitions explicitly at different stages in the application ? Given that, we have set
spark.sql.adaptive.coalescePartitions.initialPartitionNum
As the Spark documentation says, that dynamic coalesce will be able to decide the number of partitions automatically.
https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalescing-post-shuffle-partitions
In my understanding, spark.sql.shuffle.partitions is the no of partitions used in the shuffle process and not for determining the number of partitions in the resulting dataframe, the latter will be decided based on default parallelism, coalesce and repartition. In this context, documentation confuses me a little , it says, it will automatically coalesce post shuffle process and decide the number of partitions, and you do not need to set a proper shuffle partition number to fit your dataset.
In my understanding, spark.sql.shuffle.partitions is the no of partitions used in the shuffle process and not for determining the number of partitions in the resulting dataframe, the latter will be decided based on default parallelism, coalesce and repartition.
Actually the no of partitions in the resulting DataFrame is determined by spark.sql.shuffle.partitions
unless there is repartition
or coalesce
.
One of the main problems that the AQE (Adaptive Query Execution) mechanism aims to solve is when spark.sql.shuffle.partitions
is set to a high value, in order to parallelize better the shuffle operation, and the output DataFrame is written to the destination, for example:
val df1 = spark.read.parquet("...")
val df2 = spark.read.parquet("...")
df1.join(df2, Seq("col1")).write.parquet("...")
In the above example, the number of the written files will be the same as spark.sql.shuffle.partitions
since the join
operation yields DataFrame with spark.sql.shuffle.partitions
partitions.
If spark.sql.shuffle.partitions
set to a high value, there will be a high number of files written to the HDFS/cloud object store.
AQE mitigates it by allowing high parallelism level for the shuffle operation itself by introducing spark.sql.adaptive.coalescePartitions.initialPartitionNum
(increasing this value means more shuffle partitions) which afterwards will be coalesced (post shuffle coalesce) according to spark.sql.adaptive.coalescePartitions.minPartitionSize
and spark.sql.adaptive.advisoryPartitionSizeInBytes
, yielding DataFrame with less partitions. In the above example, the post-shuffle coalesce may help to decrease the number of written files.