I have a sorted dataset with different columns and an id. The dataset is sorted (also verified with parquet-tools): example:
file 1: ID 1-10
file 2: ID 10-12
file 3: ID 12-33
....
I also generated and wrote the _metadata and _common_metadata file. I tried on querying the (very big) dataset by using a filter
val mydata=spark.read.parquet("s3a://.../mylocation")
val result = mydata.filter(mydata("id") === 11)
result.explain(true)
the explain showed me:
== Parsed Logical Plan ==
Filter (id#14L = 11)
+- Relation[fieldA#12, fieldB#13,id#14L] parquet
== Analyzed Logical Plan ==
fieldA: int, fieldB: string, id: bigint
Filter (id#14L = 11)
+- Relation[fieldA#12, fieldB#13,id#14L] parquet
== Optimized Logical Plan ==
Filter (isnotnull(id#14L) && (id#14L = 11))
+- Relation[fieldA#12, fieldB#13,id#14L] parquet
== Physical Plan ==
*(1) Project [fieldA#12, fieldB#13,id#14L]
+- *(1) Filter (isnotnull(id#14L) && (id#14L = 11))
+- *(1) FileScan parquet [fieldA#12,fieldB#13,id#14L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[s3a://mybucket/path/to/data], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,11)], ReadSchema: struct<fieldA:int,fieldB:string,id:bigint>
I also enabled logging and could see that multiple files are read for getting the metadata per file. I have 10000 files in this "directory" in s3 so it takes a lot of time to retrieve all metadata from the files
Why is spark not getting the metadata from the _metadata file? Is there an option to enable this? I have already tried the following options:
spark.conf.set("parquet.summary.metadata.level","ALL")
spark.conf.set("parquet.filter.statistics.enabled","true")
spark.conf.set("parquet.filter.dictionary.enabled","true")
spark.conf.set("spark.sql.parquet.filterPushdown","true")
spark.conf.set("spark.sql.hive.convertMetastoreParquet","true")
spark.conf.set("spark.sql.parquet.respectSummaryFiles","true")
spark.conf.set("spark.sql.parquet.mergeSchema","false")
spark.conf.set("spark.sql.hive.convertMetastoreParquet.mergeSchema","false")
spark.conf.set("spark.sql.optimizer.metadataOnly", "true")
Parquet summary files were deemed to be practically useless and write support for them was disabled in SPARK-15719. The reasoning mentioned in that JIRA suggests that summary files were only used for reading the schema and not other metadata like the min/max stats that could be useful for filtering. I can't confirm whether it's actually the case, but here is an excerpt from that reasoning:
Parquet summary files are not particular useful nowadays since
- when schema merging is disabled, we assume schema of all Parquet part-files are identical, thus we can read the footer from any part-files.
- when schema merging is enabled, we need to read footers of all files anyway to do the merge.
According to this excerpt, the need to read every file footer may also be caused by schema merging being enabled, although if the summary files are really only used for the schema, then I think file footers have to be read anyway.
If querying by ID is a frequent operation for you, you may consider partitioning your table by ID to avoid reading files unnecessarily.