Search code examples
jinja2dbt

DBT - only insert\merge columns that exist in the source object


I'm new to dbt and jinja but trying my best.

We have a scenario where when using incremental merge our destination table schema is different from our Source schema so we'd like to only update/insert the common columns.

I'm loading my source columns into a variable then sending it as a configuration value like so:

{%- set src_cols = adapter.get_columns_in_relation(ref('pre_Dim_Entities_Client')) -%}

{{
    config(
        materialized='incremental',
        unique_key='Entity_ID',
        source_columns = src_cols
    )
}}

SELECT *
FROM {{ ref ('pre_Dim_Entities_Client')}}

Then I've overridden merge.sql macro:

 {% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
    {%- set predicates = [] if predicates is none else [] + predicates -%}
    {%- set temp = config.get('source_columns') -%}
    {{- print(temp) -}}
    {%- set dest_cols_csv = get_quoted_csv(config.get('source_columns', default = dest_columns) | map(attribute="name"))%}
    {%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
    {%- set sql_header = config.get('sql_header', none) -%}

I've assigned the config value to a temp var and printed it to confirm what I suspected was the issue - the columns are not passed in source_columns configuration as I would have expected.

What am I doing wrong, and alternatively- Is there a better way to go about this issue?


Solution

  • Self-answer here - Turns out passing variables in config block is problematic. Instead I sent the source object name as a config value and in the merge macro retrieved the columns and updated dest_columns to the retrieved object.

    {{
        config(
            materialized='incremental',
            unique_key='Entity_ID',
            source_scehma = 'pre_Dim_Entities_Client'
        )
    }}
    
    SELECT *
    FROM {{ ref ('pre_Dim_Entities_Client')}}
    

    And in the merge macro:

    {% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
        {%- set predicates = [] if predicates is none else [] + predicates -%}
        {%- set src_name = config.get('source_scehma', none) -%}
        {% if src_name %}
            {%- set dest_columns = adapter.get_columns_in_relation(ref(src_name)) -%}
        {% endif %}
        {%- set dest_cols_csv = get_quoted_csv( dest_columns | map(attribute="name")) -%}
        {%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
        {%- set sql_header = config.get('sql_header', none) -%}
    

    ##Note: In my solution I assume all source columns exist in the destination table.

    If your need is to only take the common columns between source and destination the code should be modified accordingly.