I'm new to dbt and jinja but trying my best.
We have a scenario where when using incremental merge our destination table schema is different from our Source schema so we'd like to only update/insert the common columns.
I'm loading my source columns into a variable then sending it as a configuration value like so:
{%- set src_cols = adapter.get_columns_in_relation(ref('pre_Dim_Entities_Client')) -%}
{{
config(
materialized='incremental',
unique_key='Entity_ID',
source_columns = src_cols
)
}}
SELECT *
FROM {{ ref ('pre_Dim_Entities_Client')}}
Then I've overridden merge.sql macro:
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set temp = config.get('source_columns') -%}
{{- print(temp) -}}
{%- set dest_cols_csv = get_quoted_csv(config.get('source_columns', default = dest_columns) | map(attribute="name"))%}
{%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
{%- set sql_header = config.get('sql_header', none) -%}
I've assigned the config value to a temp var and printed it to confirm what I suspected was the issue - the columns are not passed in source_columns configuration as I would have expected.
What am I doing wrong, and alternatively- Is there a better way to go about this issue?
Self-answer here - Turns out passing variables in config block is problematic. Instead I sent the source object name as a config value and in the merge macro retrieved the columns and updated dest_columns to the retrieved object.
{{
config(
materialized='incremental',
unique_key='Entity_ID',
source_scehma = 'pre_Dim_Entities_Client'
)
}}
SELECT *
FROM {{ ref ('pre_Dim_Entities_Client')}}
And in the merge macro:
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set src_name = config.get('source_scehma', none) -%}
{% if src_name %}
{%- set dest_columns = adapter.get_columns_in_relation(ref(src_name)) -%}
{% endif %}
{%- set dest_cols_csv = get_quoted_csv( dest_columns | map(attribute="name")) -%}
{%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
{%- set sql_header = config.get('sql_header', none) -%}
##Note: In my solution I assume all source columns exist in the destination table.
If your need is to only take the common columns between source and destination the code should be modified accordingly.