I have a dataframe which contains a JSON String as a StringType
Column. How can you write this to BigQuery JSON Column (New Feature)? Spark JSON Write to BigQuery with examples is not available https://github.com/GoogleCloudDataproc/spark-bigquery-connector
When we try to write the string in append Mode to a JSON Column we get an error
Field source has changed type from JSON to STRING
According to Readme -> Datatypes -> JSON
How this can be implemented with an example
Spark BigQuery Connector usage for JSON is mentioned in their Github
Spark has no JSON type. The values are read as String. In order to write JSON back to BigQuery, the following conditions are REQUIRED Use the INDIRECT write method Use the AVRO intermediate format The DataFrame field MUST be of type String and has an entry of sqlType=JSON in its metadata
I had raised an issue for clarification which can be found here #880 and #882
In the below Scala Spark example, I have a table, that contains two JSON columns metadata
& data
. Here we first create a table_schema
metadata and then we assign the schema metadata to the Spark RDD as follows. Further, we can write directly to the JSON columns in BigQuery using Scala Spark
import org.apache.spark.sql.types.{Metadata, StringType, StructField, StructType}
val table_schema = new StructType()
.add(StructField(metadata, StringType, true, Metadata.fromJson({sqlTypeJSON})))
.add(StructField(data, StringType, true, Metadata.fromJson({sqlTypeJSON})))
Create a BigQuery Dataframe from the RDD and Schema above
val df_bigquery = spark.createDataFrame(rowRDD, table_schema)
Write to BigQuery using Spark BigQuery Connector
For JSON Column ingestion, writeMethod should be indirect with a temporaryGcsBucket and intermediateFormat as avro
Target Table Details are provided in spark.datasource.bigquery.full_load_table
This has two modes, where spark.datasource.bigquery.write.mode can be either overwrite or append
df_bigquery.write
.format(bigquery)
.option(temporaryGcsBucket, 'temp-bucket-name'
.option(writeMethod, indirect)
.option(intermediateFormat, avro)
.mode(append)
.option(table, 'schema.table')
.option(createDisposition, CREATE_IF_NEEDED)
.option(writeDisposition, WRITE_TRUNCATE)
.save()
}