I came across one problem while reading parquet through spark.
One parquet file has been written with field a
of type Integer
. Afterwards, reading this file with schema for a
as Long
gives exception.
Caused by: java.lang.UnsupportedOperationException: Unimplemented type: LongType at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:397) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:199) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:263) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:161) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
I thought this compatible type change is supported. But this is not working.
Code snippet of this:
val oldSchema = StructType(StructField("a", IntegerType, true) :: Nil)
val df1 = spark.read.schema(oldSchema).json("/path/to/json/data")
df1.write.parquet("/path/to/parquet/data")
val newSchema = StructType(StructField("a", LongType, true) :: Nil)
spark.read.schema(newSchema).parquet("/path/to/parquet/data").show()
Any help around this is really appreciated.
as parquet is column based storage format for Hadoop so it keeps the datatype of the data also. So while reading the parquet with different datatype even if it's upcasting it’s not handled automatically.
You need to specifically cast the data
val colarraywithcast = Array(col("eid"),col("did"),col("seal").cast(LongType))
df.select(colarraywithcast:_*).printSchema