My team is building an ETL process to load raw delimited text files into a Parquet based "data lake" using Spark. One of the promises of the Parquet column store is that a query will only read the necessary "column stripes".
But we're seeing unexpected columns being read for nested schema structures.
To demonstrate, here is a POC using Scala and the Spark 2.0.1 shell:
// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._
// Create a schema with nested complex structures
val schema = StructType(Seq(
StructField("F1", IntegerType),
StructField("F2", IntegerType),
StructField("Orig", StructType(Seq(
StructField("F1", StringType),
StructField("F2", StringType))))))
// Create some sample data
val data = spark.createDataFrame(
sc.parallelize(Seq(
Row(1, 2, Row("1", "2")),
Row(3, null, Row("3", "ABC")))),
schema)
// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")
Then we read the file back into a DataFrame and project to a subset of columns:
// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")
// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show
When this runs we see the expected output:
+---+-------+
| F1|Orig_F1|
+---+-------+
| 1| 1|
| 3| 3|
+---+-------+
But... the query plan shows a slightly different story:
The "optimized plan" shows:
val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet
And "explain" shows:
projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>
And the INFO logs produced during execution also confirm that the Orig.F2 column is unexpectedly read:
16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:
Parquet form:
message spark_schema {
optional int32 F1;
optional group Orig {
optional binary F1 (UTF8);
optional binary F2 (UTF8);
}
}
Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))
According to the Dremel paper and the Parquet documentation, columns for complex nested structures should be independently stored and independently retrievable.
Questions:
Possibly related: Why does the query performance differ with nested columns in Spark SQL?
The issue has been fixed since Spark 2.4.0. This applies to struct as well as array of structs.
Before Spark 3.0.0:
Set spark.sql.optimizer.nestedSchemaPruning.enabled
to true
See related Jira here: https://issues.apache.org/jira/browse/SPARK-4502
After Spark 3.0.0:
spark.sql.optimizer.nestedSchemaPruning.enabled
now default is true
Related Jira here: https://issues.apache.org/jira/browse/SPARK-29805
Also related SO question: Efficient reading nested parquet column in Spark