I use global sort on my spark DF, and when I enable AQE and post-shuffle coalesce, my partitions after sort operation become even worse distributed than before.
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.adaptive.coalescePartitions.enabled" -> "true",
"spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
"spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
"spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
My query, on high level, looks:
.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
My first assumption was, that even with a small range has a too big a spike. But I check and confirm that repartition by a range gives me good distribution in records, but bad is size. I have nearly 200 partitions with near the same amount of records and size differences of up to 9x times, from ~100Mb to ~900mb. But with AEQ and repartition to 18000 small ranges, the smallest partition was 18mib and the biggest 1.8Gib. This state of things is much worse than without AEQ. It's important to highlight that I use metrics from Spark UI -> Details for Stage tab to identify partitions size in bytes, and I have my own logs for records.
So I start to debug the issue, but AQE don't have enough logs on the input and output of
ShufflePartitionsUtil.coalescePartitions
.
That why I rewrite my query to repartitionByRange.sortWithingPartitoins. And fork Physical Plan optimization with additional logging.
My logs show me, that my initial idea was right.
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435
And
Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition maxsize :312832323
Output partition min size :103832323
Min size is so different, because of the size of the last partition, it's expected. TRACE log level shows that 99% of partitions is near 290mib.
But why spark UI show so different results? ->
May spark UI be wrong? ->
Maybe, but except task size, the duration of a task is also too big, which makes me think spark UI is ok.
So my assumption is that the issue is with MapOutputStatistics
in my stage. But does it always broken or only in my case? ->
Only in my case? -> I made a few checks to confirm it.
All these checks make me think that MapOutputStatistics
is wrong only for my case. May be an issue that how to relate to Kafka source or that my Kafka input data is very uneven distributed.
Questions:
P.S.
I also want to mention that my input Kafka data frame is 2160, not even distributed partitions -> some partitions can be 2x time bigger then others. Read from Kafka topic with 720 partitions and minPartitions
option * 3.
Here is the answer: https://www.mail-archive.com/[email protected]/msg26851.html
The worst case of enabling AQE in cached data is not losing the opportunity of using/reusing the cache, but rather just an extra shuffle if the outputPartitioning happens to match without AQE and not match after AQE. The chance of this happening is rather low.