Search code examples
apache-sparkapache-spark-sqlspark3catalyst-optimizer

Why would finding an aggregate of a partition column in Spark 3 take very long time?


I'm trying to query the MIN(dt) in a table partitioned by dt column using the following query in both Spark2 and Spark3:

SELECT MIN(dt) FROM table_name

The table is stored in parquet format in S3, where each dt is a separate folder, so this seems like a pretty simple operation. There's about 3,200 days of data.

In Spark2, this query completes in ~1 minute, while in Spark3, the query takes over an hour (not sure how long exactly since it hasn't finished yet).

In Spark3, the execution plan is:

AdaptiveSparkPlan (10)
+- == Current Plan ==
   HashAggregate (6)
   +- ShuffleQueryStage (5)
      +- Exchange (4)
         +- * HashAggregate (3)
            +- * ColumnarToRow (2)
               +- Scan parquet table_name (1)
+- == Initial Plan ==
   HashAggregate (9)
   +- Exchange (8)
      +- HashAggregate (7)
         +- Scan parquet table_name (1)

It's confusing to me how this would take a long time, as the data is already partitioned by dt. Spark only needs to determine which partitions have any rows and return the min of those.


Solution

  • What you're suggesting was implemented once as OptimizeMetadataOnly query optimizer rule, via JIRA SPARK-15752 "Optimize metadata only query that has an aggregate whose children are deterministic project or filter operators".

    However, it was found to cause correctness issues sometimes, when some of the partitions contained zero-row files, see JIRA SPARK-26709 "OptimizeMetadataOnlyQuery does not correctly handle the files with zero record".

    Along with the fix, an internal Spark config spark.sql.optimizer.metadataOnly was added to provide a way to circumvent full-table scans "at your own risk", i.e. when you are certain that all your partitions aren't empty. Possibly, in your Spark 2 you have it set to true (or your Spark 2 doesn't include a fix at all). See also SPARK-34194 for additional discussion around it.

    SPARK 3.0 deprecated this config (SPARK-31647), so most likely it is set to false in your environment, which causes Spark to scan all table partitions before aggregating the result to find min. But for the time being, you can still try setting it to true to speed up your query, just beware of the consequences.