Search code examples
apache-sparkparquet

Case sensitive parquet schema merge in Spark


I'm trying to load and analyze some parquet files with Spark. I'm using schemaMerge to load the files since newer files have some extra columns. Also some files have their column names in lower case and others in upper case.

For example

file1.parquet has a schema like

column1 integer,
column2 integer

and file2.parquet has something like:

Column1 integer,
Column2 integer,
Column3 integer

I'm running into an issue with inferSchema method of ParquetFileFormat class. Schema merging is delegated to StructType merge method of spark sql. From what I can tell, that method can only work in a case sensitive way. Internally it uses a map to lookup fields by name and if the cases don't match it will interpret that as a new field. Later, when schema is checked for duplicates, case sensitivity configuration is respected and we end up with having duplicate columns. This results in

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema

Is there any way to make schema merge case insensitive?

I was expecting to get something like this as a resulting schema:

column1 integer,
column2 integer,
Column3 integer

Solution

  • You can set spark.sql.caseSensitive=true in your configuration to make Spark SQL schemas case-sensitive. It also affects schema merging.

    scala> spark.conf.set("spark.sql.caseSensitive","true")
    
    scala> val df = sc.parallelize(1 to 1000).toDF()
    df: org.apache.spark.sql.DataFrame = [value: int]
    
    scala> df.withColumnRenamed("value","VALUE").write.parquet("test_uc")
    
    scala> df.write.parquet("test_lc")
    
    scala> val df2=spark.read.option("mergeSchema","true").parquet("test_*")
    df2: org.apache.spark.sql.DataFrame = [value: int, VALUE: int]
    
    scala> val merged = df2.columns.groupBy(_.toLowerCase)
                       .map(t => coalesce(t._2.map(col):_*).as(t._1))
                       .toArray
    merged: Array[org.apache.spark.sql.Column] = Array(coalesce(value, VALUE) AS `value`)
    
    scala> df2.select(merged:_*)
    res2: org.apache.spark.sql.DataFrame = [value: int]
    
    scala> spark.conf.set("spark.sql.caseSensitive","false")
    
    // process your dataframe