I’m running a Spark SELECT query on a Delta Lake table partitioned by year, month, day, and hour, derived from a timestamp column. When I execute the query in Zeppelin, Spark is aware of the partitions and applies partition filters as expected. However, when I run the same query on the same table in my own Spark application (written in Java), the physical plan shows no awareness of partitions, leading to a less efficient query. According to this documentation, delta lake should automatically levarage partitions when using generated columns.
Example query:
%sql
select *
FROM delta.`tablepath`
WHERE coAt < '2023-05-22 13:00:00.0';
Physical plan:
== Physical Plan ==
Project (5)
+- Filter (4)
+- Filter (3)
+- Filter (2)
+- Scan ExistingRDD Delta Table State with Stats #58 - hdfs:<path> (1)
(1) Scan ExistingRDD Delta Table State with Stats #58 - hdfs:<path>
Output [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Arguments: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846], Delta Table State with Stats #58 - <path> MapPartitionsRDD[2934] at $anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:77, ExistingRDD, UnknownPartitioning(0)
(2) Filter
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Condition : UDF(true, size#112842L, stats#112855.numRecords)
(3) Filter
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Condition : (NOT coalesce(isnull(((((cast(partitionValues#112841[__year__] as int) < 2023) OR ((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) < 5))) OR (((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) < 22))) OR ((((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) = 22)) AND (cast(partitionValues#112841[__hour__] as int) <= 13)))), false) AND UDF(knownnotnull(((((cast(partitionValues#112841[__year__] as int) < 2023) OR ((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) < 5))) OR (((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) < 22))) OR ((((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) = 22)) AND (cast(partitionValues#112841[__hour__] as int) <= 13)))), size#112842L, stats#112855.numRecords))
(4) Filter
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Condition : UDF(true, size#112842L, stats#112855.numRecords)
(5) Project
Output [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, null AS stats#112907, tags#112846]
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
spark.sql()
and saving it to a df of type Dataset<Row>
does not apply partition filters.Physical plan:
== Physical Plan ==
DeserializeToObject (5)
+- * Project (4)
+- * Filter (3)
+- * ColumnarToRow (2)
+- Scan parquet (1)
(1) Scan parquet
Output [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
Batched: true
Location: TahoeLogFileIndex [hdfs:<path>]
PushedFilters: [IsNotNull(coAt), LessThan(coAt,2023-05-22 13:00:00.0)]
ReadSchema: struct<totVol:double,maxDowBan:double,devMod:string,coAt:timestamp>
(2) ColumnarToRow [codegen id : 1]
Input [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
(3) Filter [codegen id : 1]
Input [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
Condition : (isnotnull(coAt#1187) AND (coAt#1187 < 2023-05-22 13:00:00))
(4) Project [codegen id : 1]
Output [3]: [totVol#1088, maxDowBan#1089, devMod#1110]
Input [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
(5) DeserializeToObject
Input [3]: [totVol#1088, maxDowBan#1089, devMod#1110]
Arguments: createexternalrow(totVol#1088, maxDowBan#1089, devMod#1110.toString, StructField(totVol,DoubleType,true), StructField(maxDowBan,DoubleType,true), StructField(devMod,StringType,true)), obj#1358: org.apache.spark.sql.Row
I tried converting the datatype of timestamp column from int96
to int64
to allow metadata calculation, which improved the quering time significantly. However, I am still expecting Spark to utilize partitioning to avoid scanning Parquet files from the beginning.
The same issue occurs when I use a DELETE query in both Zeppelin and locally. Could this also be related to the same problem, or should DELETE queries behave differently in this context?
Both environments are using the same Spark 3.2.1 and Delta Core 2.12 versions.
The issue was due to missing Spark configurations. While Spark executed the SELECT query, it didn’t leverage partitioning until I explicitly set:
sparkConf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension");
sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog");
spark.sql.extensions
: Enables Delta Lake optimizations.spark.sql.catalog.spark_catalog
: Ensures Spark treats the table as a Delta table, allowing partition pruning.Without these, Spark treated the table as a generic Parquet table, ignoring partitioning. After adding them, partition filters appeared in the physical plan, improving query efficiency. Zeppelin worked because it had these configs set.
However, DELETE queries still don’t apply partition pruning, even with these configs. The physical plan remains the same, causing a full table scan. Is this expected behavior in Delta Lake, or is there a way to optimize DELETE operations?
Would appreciate any insights!