Search code examples
apache-sparkdataframeamazon-s3parquet

How to load mixed Parquet schema into DataFrame using Apache Spark?


I have a Spark job continuously uploading Parquet files to S3 (with partitions).
The files all have the same parquet schema.

One of the field types had been recently changed (from String to long) and so the parquet schema for some of the partitions is mixed.

Places having mixed data from both types now fail to read some of the content.
While it seems I can execute: sqlContext.read.load(path)
when trying to apply any fetch operation on the DataFrame (collect for example), the operation fails with ParquetDecodingException

I intend to migrate the data and re-format it but fail to read the mixed content into a DataFrame.
How can I load the mixed partitions using Apache Spark into DataFrames or any other Spark construct?

Following is the ParquetDecodingException trace:

scala> df.collect
[Stage 1:==============>        (1 + 3) / 4]
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 172.1.1.1, executor 0): org.apache.parquet.io.ParquetDecodingException: 
Can not read value at 1 in block 0 in file 
s3a://data/parquet/partition_by_day=20180620/partition_by_hour=10/part-00000-6e4f07e4-3d89-4fad-acdf-37054107dc39.snappy.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)

Solution

  • As far as I know you can not mix 2 schema that has the same field with different type. Therefore the only solution I can think of is to:

    1. List files of partition

    2. Re-write each file to a new location and transform the data to the right schame

    3. If the original data was partitioned, another pass is required as to restore partitioning.
      This is because re-writing data file-by-file overrides the partitioning.
    4. Check that you can read all of the new partition as the right schema.
    5. Remove the "bad" partition and copy the tmp partition instead