Search code examples
pythonsqlpysparkdatabricksdelta-lake

How to use Merge Into in Databricks (PySpark) given the source has null values to insert into a defined target schema (delta table)


Background: I am building a json parser that can take in any format of json and write it to a delta table with a schema that can update based on new data / new columns that come in.

I am trying to update a Databricks Delta table that already exists (the target) with an incoming json file (the source) with a MERGE INTO operation in Databricks.

For an example, let's say the existing table looks like this:

Target (existing delta table)

Nested ID Fee
[{"code": null, "value": "11"}, {"code": null, "value": "16"}] 47 57.00
null 48 null
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] 51 null

Source (incoming json)

Nested ID Fee
null 47 36.00
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] 52 17.00

Desired result (the first line is upserted with the nested value now as null, and ID 52 is inserted at the end):

Nested ID Fee
null 47 36.00
null 48 null
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] 51 null
[{"code": null, "value": "14"}, {"code": null, "value": "12"}, {"code": null, "value": "14"}] 52 17.00

Right now, I am generating a set string dynamically based on the target and source schemas (they are hundreds of columns long in many cases, and this parser is generic for 70+ different schemas of json). The target schema is from the existing delta table, and the source schema is a dataframe object that has read in the json source. This source schema can contain more columns that weren't previously in the target schema, and I want to add those columns to the table, which works when setting spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true"). The problem being the columns that are null cannot be automatically cast to the complex json structure - to my knowledge. My current MERGE INTO Statement with dynamic set strings:

MERGE INTO {existing_table_name} t
          USING {new_source_json} s
          ON t.ID = s.ID
          WHEN MATCHED THEN
            UPDATE SET {set_string}
          WHEN NOT MATCHED
            THEN INSERT {set_string_insert} 

I am getting the following error:

Failed to merge incompatible data types ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true) and StringType

Which is because the target table's nested column type is ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true) but the null value that is being used to try and update the ID 47 is being cast as a string in Spark.

My generated set_strings look like the following:

t.nested = s.nested, t.ID = s.ID, t.Fee = s.Fee

I have also tried to cast the column within the MERGE INTO statement itself:

MERGE INTO {existing_table_name} t
          USING {new_source_json} s
          ON t.ID = s.ID
 WHEN MATCHED THEN
   UPDATE SET t.Nested = cast(s.Nested as array<struct<code:string,value:string>>),t.ID = s.ID, t.Fee = s.Fee
 WHEN NOT MATCHED
   THEN INSERT (Nested,ID,Fee) 
   VALUES(s.cast(s.nested as array<struct<code:string,value:string>>), s.ID, s.Fee) 

Which still produces the same error. Do I have to cast the source dataframe columns before going into the target? That would be rather tedious and expensive for 200+ columns to use the withColumn function to do one by one for any mismatched types.

Is there a way to accomplish the desired result in Databricks using MERGE INTO or any other operation? I would prefer to avoid creating the schemas manually unless that is the only route forward.


Solution

  • I ended up working around the merge statement by instead using the existing tables schema to load in the new json file. If there was a difference in the new schema (meaning either a column data type changed, or a new column was added to the json), I would then have to enable schema evolution to ensure this new column was captured. A Code snippet for reading in the new json file with a schema defined from the previous table (the passed_schema variable):

    old_table = spark.table(f"{full_table_name}")
    passed_schema = old_table.schema
    
    def read_from_blob_as_pyspark_df_json_with_schema(file_path, container_name, storage_account_name, schema):
      spark_df = spark.read.format('json')\
        .schema(schema)\
        .load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{file_path}");
      return spark_df
    raw_json_df = read_from_blob_as_pyspark_df_json_with_schema(f, container_name, storage_account_name, passed_schema)
    

    And using the passed_schema along with the new raw_json_df, you can perform a merge statement. Again if there are mismatches, you can either add the new columns manually, or have another function to allow schema evolution for the new columns.