Search code examples
apache-sparkapache-spark-sqlparquetdelta-lake

Spark SELECT Query Ignores Partition Filters in java spark App but Works in Zeppelin


I’m running a Spark SELECT query on a Delta Lake table partitioned by year, month, day, and hour, derived from a timestamp column. When I execute the query in Zeppelin, Spark is aware of the partitions and applies partition filters as expected. However, when I run the same query on the same table in my own Spark application (written in Java), the physical plan shows no awareness of partitions, leading to a less efficient query. According to this documentation, delta lake should automatically levarage partitions when using generated columns.

In Zeppelin:

  • The SELECT query applies partition filters and prunes irrelevant partitions based on the filter condition.
  • The query is optimized, with a reduced number of rows scanned.

Example query:

%sql
select *
FROM delta.`tablepath`
WHERE coAt < '2023-05-22 13:00:00.0';

Physical plan:

== Physical Plan ==
Project (5)
+- Filter (4)
   +- Filter (3)
      +- Filter (2)
         +- Scan ExistingRDD Delta Table State with Stats #58 - hdfs:<path> (1)
(1) Scan ExistingRDD Delta Table State with Stats #58 - hdfs:<path>
Output [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Arguments: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846], Delta Table State with Stats #58 - <path> MapPartitionsRDD[2934] at $anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:77, ExistingRDD, UnknownPartitioning(0)
(2) Filter
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Condition : UDF(true, size#112842L, stats#112855.numRecords)
(3) Filter
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Condition : (NOT coalesce(isnull(((((cast(partitionValues#112841[__year__] as int) < 2023) OR ((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) < 5))) OR (((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) < 22))) OR ((((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) = 22)) AND (cast(partitionValues#112841[__hour__] as int) <= 13)))), false) AND UDF(knownnotnull(((((cast(partitionValues#112841[__year__] as int) < 2023) OR ((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) < 5))) OR (((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) < 22))) OR ((((cast(partitionValues#112841[__year__] as int) = 2023) AND (cast(partitionValues#112841[__month__] as int) = 5)) AND (cast(partitionValues#112841[__day__] as int) = 22)) AND (cast(partitionValues#112841[__hour__] as int) <= 13)))), size#112842L, stats#112855.numRecords))
(4) Filter
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]
Condition : UDF(true, size#112842L, stats#112855.numRecords)
(5) Project
Output [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, null AS stats#112907, tags#112846]
Input [7]: [path#112840, partitionValues#112841, size#112842L, modificationTime#112843L, dataChange#112844, stats#112855, tags#112846]

In Local Spark Streaming App (Java):

  • The same query using spark.sql() and saving it to a df of type Dataset<Row> does not apply partition filters.
  • The physical plan shows no partition pruning, resulting in a full table scan even though this condition is actually out of the data bounds and should return an empty table.

Physical plan:

== Physical Plan ==
DeserializeToObject (5)
+- * Project (4)
   +- * Filter (3)
      +- * ColumnarToRow (2)
         +- Scan parquet  (1)
(1) Scan parquet 
Output [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
Batched: true
Location: TahoeLogFileIndex [hdfs:<path>]
PushedFilters: [IsNotNull(coAt), LessThan(coAt,2023-05-22 13:00:00.0)]
ReadSchema: struct<totVol:double,maxDowBan:double,devMod:string,coAt:timestamp>
(2) ColumnarToRow [codegen id : 1]
Input [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
(3) Filter [codegen id : 1]
Input [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
Condition : (isnotnull(coAt#1187) AND (coAt#1187 < 2023-05-22 13:00:00))
(4) Project [codegen id : 1]
Output [3]: [totVol#1088, maxDowBan#1089, devMod#1110]
Input [8]: [totVol#1088, maxDowBan#1089, devMod#1110, coAt#1187, __year__#1217, __month__#1218, __day__#1219, __hour__#1220]
(5) DeserializeToObject
Input [3]: [totVol#1088, maxDowBan#1089, devMod#1110]
Arguments: createexternalrow(totVol#1088, maxDowBan#1089, devMod#1110.toString, StructField(totVol,DoubleType,true), StructField(maxDowBan,DoubleType,true), StructField(devMod,StringType,true)), obj#1358: org.apache.spark.sql.Row

  • I tried converting the datatype of timestamp column from int96 to int64 to allow metadata calculation, which improved the quering time significantly. However, I am still expecting Spark to utilize partitioning to avoid scanning Parquet files from the beginning.

  • The same issue occurs when I use a DELETE query in both Zeppelin and locally. Could this also be related to the same problem, or should DELETE queries behave differently in this context?

  • Both environments are using the same Spark 3.2.1 and Delta Core 2.12 versions.


Solution

  • The issue was due to missing Spark configurations. While Spark executed the SELECT query, it didn’t leverage partitioning until I explicitly set:

    sparkConf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension");
    sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog");
    

    Why This Fix Works:

    • spark.sql.extensions: Enables Delta Lake optimizations.
    • spark.sql.catalog.spark_catalog: Ensures Spark treats the table as a Delta table, allowing partition pruning.

    Without these, Spark treated the table as a generic Parquet table, ignoring partitioning. After adding them, partition filters appeared in the physical plan, improving query efficiency. Zeppelin worked because it had these configs set.

    Unresolved Issue: DELETE Queries

    However, DELETE queries still don’t apply partition pruning, even with these configs. The physical plan remains the same, causing a full table scan. Is this expected behavior in Delta Lake, or is there a way to optimize DELETE operations?

    Would appreciate any insights!