Search code examples
apache-sparkpysparkapache-spark-sqlspark3

Adaptive Query Execution and Shuffle Partitions


With Adaptive Query Execution in Spark 3+ , can we say that, we don't need to set spark.sql.shuffle.partitions explicitly at different stages in the application ? Given that, we have set

spark.sql.adaptive.coalescePartitions.initialPartitionNum

As the Spark documentation says, that dynamic coalesce will be able to decide the number of partitions automatically.
https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalescing-post-shuffle-partitions

In my understanding, spark.sql.shuffle.partitions is the no of partitions used in the shuffle process and not for determining the number of partitions in the resulting dataframe, the latter will be decided based on default parallelism, coalesce and repartition. In this context, documentation confuses me a little , it says, it will automatically coalesce post shuffle process and decide the number of partitions, and you do not need to set a proper shuffle partition number to fit your dataset.


Solution

  • In my understanding, spark.sql.shuffle.partitions is the no of partitions used in the shuffle process and not for determining the number of partitions in the resulting dataframe, the latter will be decided based on default parallelism, coalesce and repartition.

    Actually the no of partitions in the resulting DataFrame is determined by spark.sql.shuffle.partitions unless there is repartition or coalesce.

    One of the main problems that the AQE (Adaptive Query Execution) mechanism aims to solve is when spark.sql.shuffle.partitions is set to a high value, in order to parallelize better the shuffle operation, and the output DataFrame is written to the destination, for example:

    val df1 = spark.read.parquet("...")
    val df2 = spark.read.parquet("...")
    
    df1.join(df2, Seq("col1")).write.parquet("...")
    

    In the above example, the number of the written files will be the same as spark.sql.shuffle.partitions since the join operation yields DataFrame with spark.sql.shuffle.partitions partitions. If spark.sql.shuffle.partitions set to a high value, there will be a high number of files written to the HDFS/cloud object store.

    AQE mitigates it by allowing high parallelism level for the shuffle operation itself by introducing spark.sql.adaptive.coalescePartitions.initialPartitionNum (increasing this value means more shuffle partitions) which afterwards will be coalesced (post shuffle coalesce) according to spark.sql.adaptive.coalescePartitions.minPartitionSize and spark.sql.adaptive.advisoryPartitionSizeInBytes, yielding DataFrame with less partitions. In the above example, the post-shuffle coalesce may help to decrease the number of written files.