Background: I am building a json parser that can take in any format of json and write it to a delta table with a schema that can update based on new data / new columns that come in.
I am trying to update a Databricks Delta table that already exists (the target) with an incoming json file (the source) with a MERGE INTO operation in Databricks.
For an example, let's say the existing table looks like this:
Target (existing delta table)
Nested | ID | Fee |
---|---|---|
[{"code": null, "value": "11"}, {"code": null, "value": "16"}] | 47 | 57.00 |
null | 48 | null |
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] | 51 | null |
Source (incoming json)
Nested | ID | Fee |
---|---|---|
null | 47 | 36.00 |
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] | 52 | 17.00 |
Desired result (the first line is upserted with the nested value now as null, and ID 52 is inserted at the end):
Nested | ID | Fee |
---|---|---|
null | 47 | 36.00 |
null | 48 | null |
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] | 51 | null |
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] | 52 | 17.00 |
Right now, I am generating a set string dynamically based on the target and source schemas (they are hundreds of columns long in many cases, and this parser is generic for 70+ different schemas of json). The target schema is from the existing delta table, and the source schema is a dataframe object that has read in the json source. This source schema can contain more columns that weren't previously in the target schema, and I want to add those columns to the table, which works when setting spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
. The problem being the columns that are null cannot be automatically cast to the complex json structure - to my knowledge.
My current MERGE INTO Statement with dynamic set strings:
MERGE INTO {existing_table_name} t
USING {new_source_json} s
ON t.ID = s.ID
WHEN MATCHED THEN
UPDATE SET {set_string}
WHEN NOT MATCHED
THEN INSERT {set_string_insert}
I am getting the following error:
Failed to merge incompatible data types ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true) and StringType
Which is because the target table's nested column type is ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true)
but the null value that is being used to try and update the ID 47 is being cast as a string in Spark.
My generated set_strings look like the following:
t.nested = s.nested, t.ID = s.ID, t.Fee = s.Fee
I have also tried to cast the column within the MERGE INTO statement itself:
MERGE INTO {existing_table_name} t
USING {new_source_json} s
ON t.ID = s.ID
WHEN MATCHED THEN
UPDATE SET t.Nested = cast(s.Nested as array<struct<code:string,value:string>>),t.ID = s.ID, t.Fee = s.Fee
WHEN NOT MATCHED
THEN INSERT (Nested,ID,Fee)
VALUES(s.cast(s.nested as array<struct<code:string,value:string>>), s.ID, s.Fee)
Which still produces the same error. Do I have to cast the source dataframe columns before going into the target? That would be rather tedious and expensive for 200+ columns to use the withColumn function to do one by one for any mismatched types.
Is there a way to accomplish the desired result in Databricks using MERGE INTO or any other operation? I would prefer to avoid creating the schemas manually unless that is the only route forward.
I ended up working around the merge statement by instead using the existing tables schema to load in the new json file. If there was a difference in the new schema (meaning either a column data type changed, or a new column was added to the json), I would then have to enable schema evolution to ensure this new column was captured. A Code snippet for reading in the new json file with a schema defined from the previous table (the passed_schema
variable):
old_table = spark.table(f"{full_table_name}")
passed_schema = old_table.schema
def read_from_blob_as_pyspark_df_json_with_schema(file_path, container_name, storage_account_name, schema):
spark_df = spark.read.format('json')\
.schema(schema)\
.load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{file_path}");
return spark_df
raw_json_df = read_from_blob_as_pyspark_df_json_with_schema(f, container_name, storage_account_name, passed_schema)
And using the passed_schema
along with the new raw_json_df
, you can perform a merge statement. Again if there are mismatches, you can either add the new columns manually, or have another function to allow schema evolution for the new columns.