Search code examples
apache-sparkparquetorcschema-migration

schema evolution of complex types


What is the status of schema evolution for arrays of structs (complex types) in spark?

I know that for either ORC or Parquet for regular simple types works rather fine (adding a new column) but I could not find any documentation so far for my desired case.

My use case is to have a structure similar to this one:

user_id,date,[{event_time, foo, bar, baz, tag1, tag2, ... future_tag_n}, ...]

And I want to be able to add new fields to the struct in the array.

Would a Map (key-value) complex type instead cause any inefficiencies? There I would at least be sure that adding new fields (tags) would be flexible.

edit

case class BarFirst(baz:Int, foo:String)
case class BarSecond(baz:Int, foo:String, moreColumns:Int, oneMore:String)
case class BarSecondNullable(baz:Int, foo:String, moreColumns:Option[Int], oneMore:Option[String])
case class Foo(i:Int, date:String, events:Seq[BarFirst])
case class FooSecond(i:Int, date:String, events:Seq[BarSecond])
case class FooSecondNullable(i:Int, date:String, events:Seq[BarSecondNullable])
val dfInitial = Seq(Foo(1, "2019-01-01", Seq(BarFirst(1, "asdf")))).toDF
dfInitial.printSchema
dfInitial.show

root
 |-- i: integer (nullable = false)
 |-- date: string (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- baz: integer (nullable = false)
 |    |    |-- foo: string (nullable = true)


scala> dfInitial.show
+---+----------+----------+
|  i|      date|    events|
+---+----------+----------+
|  1|2019-01-01|[[1,asdf]]|
+---+----------+----------+

dfInitial.write.partitionBy("date").parquet("my_df.parquet")

tree my_df.parquet
my_df.parquet
├── _SUCCESS
└── date=2019-01-01
    └── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet


val evolved = Seq(FooSecond(2, "2019-01-02", Seq(BarSecond(1, "asdf", 11, "oneMore")))).toDF
evolved.printSchema
evolved.show

scala> evolved.printSchema
root
 |-- i: integer (nullable = false)
 |-- date: string (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- baz: integer (nullable = false)
 |    |    |-- foo: string (nullable = true)
 |    |    |-- moreColumns: integer (nullable = false)
 |    |    |-- oneMore: string (nullable = true)


scala> evolved.show
+---+----------+--------------------+
|  i|      date|              events|
+---+----------+--------------------+
|  1|2019-01-02|[[1,asdf,11,oneMo...|
+---+----------+--------------------+

import org.apache.spark.sql._
evolved.write.mode(SaveMode.Append).partitionBy("date").parquet("my_df.parquet")
my_df.parquet
├── _SUCCESS
├── date=2019-01-01
│   └── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet
└── date=2019-01-02
    └── part-00000-64e65d05-3f33-430e-af66-f1f82c23c155.c000.snappy.parquet

val df = spark.read.parquet("my_df.parquet")
df.printSchema
scala> df.printSchema
root
 |-- i: integer (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- baz: integer (nullable = true)
 |    |    |-- foo: string (nullable = true)
 |-- date: date (nullable = true)

additional columns are missing! Why?

df.show
df.as[FooSecond].collect // AnalysisException: No such struct field moreColumns in baz, foo
df.as[FooSecondNullable].collect // AnalysisException: No such struct field moreColumns in baz, foo

This behaviour was evaluated for spark 2.2.3_2.11 and 2.4.2_2.12.


Solution

  • When executing the code after after edit (above), schema merging is off and the new columns are not loaded. When enabling schema merge:

    val df = spark.read.option("mergeSchema", "true").parquet("my_df.parquet")
    scala> df.printSchema
    root
     |-- i: integer (nullable = true)
     |-- events: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- baz: integer (nullable = true)
     |    |    |-- foo: string (nullable = true)
     |    |    |-- moreColumns: integer (nullable = true)
     |    |    |-- oneMore: string (nullable = true)
     |-- date: date (nullable = true)
    

    df.as[FooSecond].collect // obviously fails NullPointerException must use option df.as[FooSecondNullable].collect // works fine

    now using hive

    evolved.write.mode(SaveMode.Append).partitionBy("date").saveAsTable("my_df")
    

    seems to work fine (no exception), but when trying to read the data back in:

    spark.sql("describe my_df").show(false)
    +-----------------------+---------------------------------+-------+
    |col_name               |data_type                        |comment|
    +-----------------------+---------------------------------+-------+
    |i                      |int                              |null   |
    |events                 |array<struct<baz:int,foo:string>>|null   |
    |date                   |string                           |null   |
    |# Partition Information|                                 |       |
    |# col_name             |data_type                        |comment|
    |date                   |string                           |null   |
    +-----------------------+---------------------------------+-------+
    

    when instead of an Array of Structs only using basic types:

    val first = Seq(Foo(1, "2019-01-01")).toDF
    first.printSchema
    first.write.partitionBy("dt").saveAsTable("df")
    val evolved = Seq(FooEvolved(1,2, "2019-01-02")).toDF
    evolved.printSchema
    evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
    evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
    org.apache.spark.sql.AnalysisException: The column number of the existing table default.df(struct<first:int,dt:string>) doesn't match the data schema(struct<first:int,second:int,dt:string>);
    

    there is a clear error message Question: is it still possible to evolve the schema in Hive? Or is a manual adaption of the schema required?

    Conclusion

    Schema evolution for Arrays of Structs is supported, but one must turn on the merge option when reading the files and seems to work out of the box only when directly reading the files without Hive.

    When reading from hive only the old schema is returned, as when writing new columns seem to be dropped silently.

    Schema evolution in parquet format (manually creating views, an additional benefit that parquet unsupported schema evolution (rename, datatype change are possible)) looks like an interesting alternative as the merge-schema option set to true is quite resource heavy and it works for all SQL engines on Hadoop.