Search code examples
jsonpysparkgoogle-bigqueryapache-spark-sqlspark-bigquery-connector

Unable to read bigquery table with JSON/RECORD column type into spark dataframe. ( java.lang.IllegalStateException: Unexpected type: JSON)


we are trying to read a table from Bigquery to spark dataframe.

Strucute of the table is simple_table

Following pyspark code is used for reading the data.

    from google.oauth2 import service_account
    from google.cloud import bigquery
    import json
    import base64 as bs
    from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, DecimalType
    
    schema = "schema_name"
    project_id = "project_id"
    
    table_name = "simple"
    # table_name = "jsonres"
    schema_table_name = str(project_id) + "." + str(schema) + "." + str(table_name)
    credentials_dict = {"Insert_actual_credentials": "here"}

    credentials = service_account.Credentials.from_service_account_info(credentials_dict)
    client = bigquery.Client(credentials=credentials, project=project_id)
    
    query = "SELECT * FROM `{}`;".format(schema_table_name)
    # print(query)
    query_job = client.query(query)
    query_job.result()
    
    s = json.dumps(credentials_dict)
    res = bs.b64encode(s.encode('utf-8'))
    ans = res.decode("utf-8")
    
    try:
        df = spark.read.format('bigquery') \
            .option("credentials", ans) \
            .option("parentProject", project_id) \
            .option("project", project_id) \
            .option("mode", "DROPMALFORMED") \
            .option('dataset', query_job.destination.dataset_id) \
            .load(query_job.destination.table_id)
        df.printSchema()
        print(df)
        df.show()
    except Exception as exp:
        print(exp)

For simple tables, we are able to read table as dataframe successfully.

But when we have json column in the big query table as given below, we are getting error. json_col_table

We are getting the following error.

An error occurred while calling o1138.load. : java.lang.IllegalStateException: Unexpected type: JSON at com.google.cloud.spark.bigquery.SchemaConverters.getStandardDataType(SchemaConverters.java:355) at com.google.cloud.spark.bigquery.SchemaConverters.lambda$getDataType$3(SchemaConverters.java:303)

We also tried by providing schema while reading the data.

structureSchema = StructType([ \
        StructField('x', StructType([
             StructField('name', StringType(), True)
             ])),
    StructField("y", DecimalType(), True) \
  ])
print(structureSchema)

try:
    df = spark.read.format('bigquery') \
        .option("credentials", ans) \
        .option("parentProject", project_id) \
        .option("project", project_id) \
        .option("mode", "DROPMALFORMED") \
        .option('dataset', query_job.destination.dataset_id) \
        .schema(structureSchema) \
        .load(query_job.destination.table_id)
    df.printSchema()
    print(df)
    df.show()
except Exception as exp:
    print(exp)

Still we faced the same error 'java.lang.IllegalStateException: Unexpected type: JSON'.

How to read bigquery table with json type into spark dataframe?

Update 1: There is an open issue in github regarding this.

While reading a bigquery table, having a JSON type field from Apache Spark throws exception.

Is there any workaround for this?


Solution

  • There is a corresponding Github issue for this in bigquery connector repository.

    While reading a bigquery table, having a JSON type field from Apache Spark throws exception. · Issue #804 · GoogleCloudDataproc/spark-bigquery-connector

    They are saying issue is fixed in 0.28.0 and above versions.

    Commit containing their fix: json related changes

    #804 · abhijeet-lele/spark-bigquery-connector@0c7de63

    commit_Screenshot

    Maven Repository: com.google.cloud.spark » spark-bigquery maven_ss