I have following (simplified) schema:
root
|-- event: struct (nullable = true)
| |-- spent: struct (nullable = true)
| | |-- amount: decimal(34,3) (nullable = true)
| | |-- currency: string (nullable = true)
| |
| | ... ~ 20 other struct fields on "event" level
I'm trying to sum on nested field
spark.sql("select sum(event.spent.amount) from event")
According to spark metrics I'm reading 18 GB from disk and it takes 2.5 min.
However when I select the top level field:
spark.sql("select sum(amount) from event")
I read only 2GB in 4 seconds.
From the physical plan I can see that in case of nested structure the whole event struct with all fields are read from parquet, which is a waste.
Parquet format should be able to provide the desired column from nested structure without reading it all (which is the point of columnar store). Is there some way to do this efficiently in Spark ?
Solution:
spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
spark.sql("select sum(amount) from (select event.spent.amount as amount from event_archive)")
The query must be written in sub-select fashion. You can't wrap the selected column in aggregate function. Following query will break schema pruning:
select sum(event.spent.amount) as amount from event
Whole schema pruning work is covered in SPARK-4502
Dirty workaround can be also specifying "projected schema" at load time:
val DecimalType = DataTypes.createDecimalType(18, 4)
val schema = StructType(StructField("event", StructType(
StructField("spent", StructType(
StructField("amount", DecimalType, true) :: Nil
), true) :: Nil
), true) :: Nil
)
val df = spark.read.format("parquet").schema(schema).load(<path>)