Search code examples
apache-sparkgoogle-bigqueryschemagoogle-cloud-storage

1 column with different schemas across files in folder in Spark (PlainDoubleDictionary)


TLDR
Data Source: GCS
target BigQuery
Problem wildcard reads multiple files, all with same columns, but airport_fee sometimes is of integer and sometimes of double datatype
Error: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

I am using New York Taxi data for a project. All was going smooth until I tried to load the data from GCS to BigQuery with Spark after applying some light transformations (some type casting, renaming columns and some filters)

the problem is a column called "airport_fee"

apparently the column is int type in some files, and double type in others.

here is a sample

yellow_source = f"gs://{gcp_bucket}/yellow_trip_data/*"

spark = SparkSession \
    .builder \
    .master('yarn') \
    .config("spark.sql.files.ignoreCorruptFiles", "true") \
    .config("spark.sql.ansi.enabled", "true") \
    .appName('ny_taxi') \
    .getOrCreate()

df = spark.read.parquet(yellow_source) \
.withColumn("airport_fee", F.col('airport_fee').cast('double')

df= df.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("bigquery") \
    .option("temporaryGcsBucket", gcs_spark_bucket) \
    .option("dataset", staging_dataset) \
    .save("bqtb_stg_yellow")

I have hopelessly tried the above, and many other variations(mergeschema, overwriteschema, ignore corrupt files, schema(schema)), even when they didn't make sense. all of them failed.

I think I have only managed to make it work with 1 file at a time, but that would defy the purpose of using Spark wouldn't it? the files are sub 50mb on average. (P.S wrapping the code in a for loop fails after first couple of files anyway)

I have ended up dropping the column all together as I have spent way too much time trying to resolve this, but I do not feel good about that.

any tips would be appreciated.

full error

Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary


Solution

  • I solved this long ago, and just recently packaged the solution.

    pip install schemadiffed
    

    context: When I made this post, I was working on ingesting data where all files had the same columns, but some columns had varying data types across files (int vs double). Processing these files directly in Spark caused issues due to these schema inconsistencies. It also didnt work in Bigquery.

    To circumvent this, I devised a solution to group the files based on their schemas. This involved reading the metadata of each file to extract the schema information, then grouping the files that had the same schema. Now I could just run some loops on each schema group and cast them to a unified schema at the destination.

    Although unlikely, but I packaged this solution in case someone else ever has the same problem, they wouldn't have to walk around in circles for days like I did.