(Spark 2.0.2)
The problem here rises when you have parquet files with different schema and force the schema during read. Even though you can print the schema and run show()
ok, you cannot apply any filtering logic on the missing columns.
Here are the two example schemata:
// assuming you are running this code in a spark REPL
import spark.implicits._
case class Foo(i: Int)
case class Bar(i: Int, j: Int)
So Bar
includes all the fields of Foo
and adds one more (j
). In real-life this arises when you start with schema Foo
and later decided that you needed more fields and end up with schema Bar
.
Let's simulate the two different parquet files.
// assuming you are on a Mac or Linux OS
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo")
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar")
What we want here is to always read data using the more generic schema Bar
. That is, rows written on schema Foo
should have j
to be null.
case 1: We read a mix of both schema
spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show()
+---+----+
| i| j|
+---+----+
| 1| 2|
| 1|null|
+---+----+
spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show()
+---+---+
| i| j|
+---+---+
| 1| 2|
+---+---+
case 2: We only have Bar data
spark.read.parquet("/tmp/bar").show()
+---+---+
| i| j|
+---+---+
| 1| 2|
+---+---+
case 3: We only have Foo data
scala> spark.read.parquet("/tmp/foo").show()
+---+
| i|
+---+
| 1|
+---+
The problematic case is 3, where our resulting schema is of type Foo
and not of Bar
. Since we migrate to schema Bar
, we want to always get schema Bar
from our data (old and new).
The suggested solution would be to define the schema programmatically to always be Bar
. Let's see how to do this:
val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false))
Running show() works great:
scala> spark.read.schema(barSchema).parquet("/tmp/foo").show()
+---+----+
| i| j|
+---+----+
| 1|null|
+---+----+
However, if you try to filter on the missing column j, things fail.
scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show()
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481)
java.lang.IllegalArgumentException: Column [j] was not found in schema!
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
What worked for me is to use the createDataFrame
API with RDD[Row]
and the new schema (which at least the new columns being nullable).
// Make the columns nullable (probably you don't need to make them all nullable)
val barSchemaNullable = org.apache.spark.sql.types.StructType(
barSchema.map(_.copy(nullable = true)).toArray)
// We create the df (but this is not what you want to use, since it still has the same issue)
val df = spark.read.schema(barSchemaNullable).parquet("/tmp/foo")
// Here is the final API that give a working DataFrame
val fixedDf = spark.createDataFrame(df.rdd, barSchemaNullable)
fixedDf.filter($"j".isNotNull).show()
+---+---+
| i| j|
+---+---+
+---+---+