Search code examples
azuredatabricks

Azure Databricks: Not able to parameterize keys option of dlt.apply_changes


I have created a DLT metadata driven notebook which reads data from configuration tables and processes the data from ADLS Gen2 to DLT tables. I am able to parameterize all other options like target, source, sequence_by, etc. But I am getting issues while parameterizing keys option. while executing the logic if the parameter value is passed as 'pkey' for referring, spark splits all characters and tries to pass each character as column name by which I receive '[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name [ cannot be resolved. Did you mean one of the following? [Name, pkey, Value, dq_check, Description]. SQLSTATE: 42703; 'Project ['[, '', 'p, 'k, 'e, 'y, '', ']]'.

---SQL script for config tables

    DROP TABLE IF EXISTS XyzBank.MetaData.SourceMetaData;
    CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceMetaData
    (
    SourceMetaDataId BIGINT GENERATED ALWAYS AS IDENTITY
    ,SourceFileName STRING
    ,SourceFilePath STRING
    ,SourceFileFormat STRING    
    ,SourceActive STRING    
    ,SourceDelimeter STRING 
    ,SourceHeader STRING    
    ,SourcePartitionColumn STRING   
    ,SourceDataQuality STRING   
    ,SourceTransformation STRING    
    ,SourceTransformFunc STRING 
    ,SourceFileOptions STRING   
    ,SourceTableName STRING 
    ,SourceSchemaName STRING    
    ,ScdExceptColumnList STRING 
    ,ScdType    INT
    ,SequenceBy STRING
    ,Keys  STRING   
    ,SourceGroupId  INT
    ,CreatedOn TIMESTAMP
    ,CreatedBy STRING   
    ,ModifiedOn TIMESTAMP
    ,ModifiedBy STRING
    )
    
    
    INSERT INTO XyzBank.MetaData.SourceMetaData (SourceFilePath,SourceFileFormat,SourceActive,SourceDelimeter,SourceHeader,SourceDataQuality,SourceFileOptions,SourceTableName,ScdType,SequenceBy,Keys,SourceGroupId,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
    SELECT 'abfss://[email protected]/DLT/', 'csv', 'True', ',', 'True', '{"validate Description":"(Description is NOT NULL)","validate Name":"(Name is NOT NULL)"}', '{"cloudFiles.format":"csv","header":True}', 'Product', 2,'file_process_date','["pkey"]',1, current_timestamp(), 'ABC', current_timestamp(), 'ABC'
    
    
    
    
    DROP TABLE IF EXISTS XyzBank.MetaData.SourceSchemaConfig;
    CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceSchemaConfig
    (
    SourceSchemaConfigId BIGINT GENERATED ALWAYS AS IDENTITY    
    ,SourceMetaDataId   INT
    ,ColumnName STRING  
    ,ColumnDataType STRING  
    ,ColumnOrder INT    
    ,IsNullable STRING  
    ,IsSensitive STRING 
    ,CreatedOn TIMESTAMP    
    ,CreatedBy STRING   
    ,ModifiedOn TIMESTAMP   
    ,ModifiedBy STRING
    )
    
    
    
    INSERT INTO XyzBank.MetaData.SourceSchemaConfig (SourceMetaDataId,ColumnName,ColumnDataType,ColumnOrder,IsNullable,IsSensitive,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
    SELECT 1, 'pkey', 'IntegerType', 1, 'False', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
    SELECT 1, 'Description', 'StringType', 2, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
    SELECT 1, 'Name', 'StringType', 3, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
    SELECT 1, 'Value', 'StringType', 4, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC'

------end of SQL script

#pyspark script

    import dlt 
    from pyspark.sql import SparkSession 
    from pyspark.sql.functions import col, lit, expr 
    from pyspark.sql import Row 
    import json 
    from pyspark.sql import functions as F 
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType, DateType, DoubleType

    df = spark.sql("SELECT * FROM xyzbank.metadata.sourcemetadata WHERE SourceGroupId = 1")
    for row in df.collect():
        #print(row['SourceMetaDataId'])
        schema_query = f"SELECT * FROM xyzbank.metadata.sourceschemaconfig WHERE SourceMetaDataId = {row['SourceMetaDataId']}"
        #print(schema_query)
        df_schema = spark.sql(schema_query)
        #display(df_schema)
        
        
        data_type_mapping = {
            "StringType": StringType(),
            "IntegerType": IntegerType(),
            "TimeType": TimestampType(),
            "Datetime": DateType(),
            "DoubleType": DoubleType(),
            "DateType": DateType()
        }
    
        # Collect distint values of "ColumnDataType" and "ColumnName" and "ColunOrder"
        distinct_datatypes = (
            df_schema.select("ColumnDataType", "ColumnName", "ColumnOrder").distinct().collect()
        )
    
        # Sort distinct_datatypes based on "ColumnOrder"
        distinct_datatypes = sorted(distinct_datatypes, key=lambda x: x.ColumnOrder)
    
        # Create schema fields
        schema_fields = [
            StructField(row.ColumnName, data_type_mapping[row.ColumnDataType], True)
            for row in distinct_datatypes
        ]
    
        # Create and return the schema
        schema = StructType(schema_fields)
        
        
        
        display(row)
        #dlt_ingestion_metdata_function(row=row, schema=schema)
        table_name = row['SourceTableName']
        checks = row['SourceDataQuality']
        checks = json.loads(checks)
        keys = row['Keys']
        display(keys)
        #keys = ["pkey"]
        print(keys)
        sequence_by = row['SequenceBy']
        display(sequence_by)
        file_path = row['SourceFilePath']
        cloud_file_options = eval(row['SourceFileOptions'])
        dq_rules = "({0})".format("AND".join(checks.values()))
    
        @dlt.table(
            name = "brz_load_"+table_name
        )
        def bronze_load():
            df3 = spark.readStream.format("cloudFiles").options(**cloud_file_options).schema(schema).load(file_path)
            df3 = df3.withColumn("file_process_date", F.current_timestamp())
            return df3
        
        @dlt.table(
            name = "stag_silver_load_"+table_name
        )
        @dlt.expect_all(checks)
        def stag_silver_table():
            df3 = dlt.readStream("brz_load_"+table_name)
            df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=true")
            return df3
        
    
        dlt.create_streaming_table(
            name = "silver_load_"+table_name
        )
        dlt.apply_changes(
            target = "silver_load_"+table_name,
            source = "stag_silver_load_"+table_name,
            keys=keys,
            stored_as_scd_type=2,
            sequence_by=sequence_by
        )
    
        @dlt.table(
            name = "quarantine_silver_load_"+table_name
        )
        @dlt.expect_all(checks)
        def quarantine_silver_table():
            df3 = dlt.readStream("brz_load_"+table_name)
            df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=false")
            return df3
        
    

 #end of pyspark script

Solution

  • The keys you are inserting in the XyzBank.MetaData.SourceMetaData is of string type '["pkey"]', so convert it to array of strings and pass to apply_changes.

    Use below code to convert it.

    import  json
    keys  =  json.loads(row['Keys'])
    

    If the input is always kind of this string '["pkey"]' you can use above code to get array of strings, else if the input strings changing according to that you convert it so that the apply_changes get the keys as array of strings containing key column.