Search code examples
arraysstructazure-databricks

How to update a value in a delta table column in Azure Databricks with Map of Struct datatype?


I have a delta table in Databricks named prod.silver.control_table. It has a few columns including table_name with string data type and transform_options with below structure:

 |-- transform_options: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- col_name_mappings: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- type_mappings: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- partition_duplicates_by: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- order_duplicates_by: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)

For example, when table_name is prod.silver.weather, the transform_options is:

{
"prod.bronze.weather_source_a":{"col_name_mappings":{"col_a_old":"col_a_new","col_b_old":"col_b_new"},"type_mappings":{"col_a_new":"INT","col_b_new":"TIMESTAMP"},"partition_duplicates_by":["col_a_new"],"order_duplicates_by":["_commit_version"]},
"prod.bronze.weather_source_b":{"col_name_mappings":{"col_c_old":"col_c_new","col_d_old":"col_d_new"},"type_mappings":{"col_c_new":"INT","col_d_new":"TIMESTAMP"},"partition_duplicates_by":["col_c_new"],"order_duplicates_by":["ingestion_timestamp","_commit_version"]}
}

I need to update values in order_duplicates_by. I need to change _commit_version into commit_version by removing the initial underscore.

In the above example, there are 2 key-value pairs in the transform_options column. It is not always the case and there might be only one key-value pair.

Any idea how to update table values?

Note that I want to update values in the control table. I prefer to use the SQL command like below however if there is a better way, please let me know:

UPDATE prod.silver.control_table
SET ...

I tried the below code:

UPDATE prod.silver.control_table
SET transform_options = 
    MAP(
        /* I iterate through each key-value pair in the original map */
        TRANSFORM_KEYS(transform_options, key -> key),
        TRANSFORM_VALUES(transform_options, (key, value) -> 
            /* then I create a new struct with updated order_duplicates_by */
            STRUCT(
                value.col_name_mappings AS col_name_mappings,
                value.type_mappings AS type_mappings,
                value.partition_duplicates_by AS partition_duplicates_by,
                /* Here I replace '_commit_version' with 'commit_version' in the array */
                TRANSFORM(value.order_duplicates_by, item -> 
                    CASE WHEN item = '_commit_version' THEN 'commit_version' ELSE item END
                ) AS order_duplicates_by
            )
        )
    )
WHERE table_name = 'prod.silver.weather';

But get an error:

com.databricks.backend.common.rpc.SparkDriverExceptions$SQLExecutionException: org.apache.spark.sql.AnalysisException: [INVALID_LAMBDA_FUNCTION_CALL.NUM_ARGS_MISMATCH]
    Invalid lambda function call. A higher order function expects 1 arguments, but got 2.; line 5 pos 42

Solution

  • You can use the code below to perform such transformations.

    SELECT 
        table_name,
        transform_values(transform_options, (k, v) -> 
        case
        when array_contains(v.order_duplicates_by, '_commit_version') then
        STRUCT(v.col_name_mappings, v.type_mappings, v.partition_duplicates_by, array_append(array_remove(v.order_duplicates_by, '_commit_version'), 'commit_version') as order_duplicates_by)
        else
         STRUCT(v.col_name_mappings, v.type_mappings, v.partition_duplicates_by, v.order_duplicates_by)
        end
        ) AS updated_transform_options
    FROM control_table
    

    Here, using the transform_values function, I am getting map keys and values and checking if _commit_version is present. If it is present, then remove and append the required string; otherwise, leave the old values.

    Output:

    table_name updated_transform_options
    prod.silver.weather {"prod.bronze.weather_source_a":{"col_name_mappings":{"col_a_old":"col_a_new","col_b_old":"col_b_new"},"type_mappings":{"col_a_new":"INT","col_b_new":"TIMESTAMP"},"partition_duplicates_by":["col_a_new"],"order_duplicates_by":["commit_version"]}}
    prod.silver.other_table {"prod.bronze.weather_source_b":{"col_name_mappings":{"col_x_old":"col_x_new","col_y_old":"col_y_new"},"type_mappings":{"col_x_new":"STRING","col_y_new":"DATE"},"partition_duplicates_by":["col_x_new"],"order_duplicates_by":["ingestion_timestamp","commit_version"]}}

    Updated code:

    update control_table 
    set transform_options = transform_values(
      transform_options, (k, v) -> 
        case
        when array_contains(v.order_duplicates_by, '_commit_version') then
        STRUCT(v.col_name_mappings, v.type_mappings, v.partition_duplicates_by, array_append(array_remove(v.order_duplicates_by, '_commit_version'), 'commit_version') as order_duplicates_by)
        else
         STRUCT(v.col_name_mappings, v.type_mappings, v.partition_duplicates_by, v.order_duplicates_by)
        end
        )