I'm trying to load and analyze some parquet files with Spark.
I'm using schemaMerge
to load the files since newer files have some extra columns. Also some files have their column names in lower case and others in upper case.
For example
file1.parquet
has a schema like
column1 integer,
column2 integer
and file2.parquet
has something like:
Column1 integer,
Column2 integer,
Column3 integer
I'm running into an issue with inferSchema
method of ParquetFileFormat
class. Schema merging is delegated to StructType
merge
method of spark sql. From what I can tell, that method can only work in a case sensitive way.
Internally it uses a map to lookup fields by name and if the cases don't match it will interpret that as a new field. Later, when schema is checked for duplicates, case sensitivity configuration is respected and we end up with having duplicate columns. This results in
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema
Is there any way to make schema merge case insensitive?
I was expecting to get something like this as a resulting schema:
column1 integer,
column2 integer,
Column3 integer
You can set spark.sql.caseSensitive=true
in your configuration to make Spark SQL schemas case-sensitive. It also affects schema merging.
scala> spark.conf.set("spark.sql.caseSensitive","true")
scala> val df = sc.parallelize(1 to 1000).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.withColumnRenamed("value","VALUE").write.parquet("test_uc")
scala> df.write.parquet("test_lc")
scala> val df2=spark.read.option("mergeSchema","true").parquet("test_*")
df2: org.apache.spark.sql.DataFrame = [value: int, VALUE: int]
scala> val merged = df2.columns.groupBy(_.toLowerCase)
.map(t => coalesce(t._2.map(col):_*).as(t._1))
.toArray
merged: Array[org.apache.spark.sql.Column] = Array(coalesce(value, VALUE) AS `value`)
scala> df2.select(merged:_*)
res2: org.apache.spark.sql.DataFrame = [value: int]
scala> spark.conf.set("spark.sql.caseSensitive","false")
// process your dataframe