I am new to dbt and SQL in general. I am building an incremental model in dbt that gets run daily. I have a table on snowflake for example like this.
Nat_K1 | Nat_k2 | Product | Supplier | Component | Metadata |
---|---|---|---|---|---|
6231 | ~~ | Toy Car | ToysRus | Base | {Hash:2bbd4cb604298556a40f16c50218b226, Load_time: 01:01:2023} |
6231 | ~~ | Toy Car | ToysRus | Wheels | {Hash:fd827e6fe9105024dfc5e58d21cde9bd, Load_time: 01:01:2023} |
6231 | ~~ | Toy Car | ToysRus | Remote | {Hash:6dfddb68e3219fa66af182f19b1c2ddf, Load_time: 01:01:2023} |
I have a view that gets data from my source. The incremental model is a ctas using this view. I want to make sure no duplicates are being added. How do I make dbt check the hash values already in the table before inserting. The hash is made up of Product, Supplier and Component and makes up the Metadata variant column.
For example if I do dbt run the next day. This row would be added. Which I want to avoid.
Nat_K1 | Nat_k2 | Product | Supplier | Component | Metadata |
---|---|---|---|---|---|
6231 | ~~ | Toy Car | ToysRus | Base | {Hash:2bbd4cb604298556a40f16c50218b226, Load_time: 02:01:2023} |
I am unsure of the best approach to handle this.
Incremental models will automatically update records based on the unique_key
. However, if you want to keep the timestamp, you have to get a little more creative.
Option 1: Join with self
{{
config({
"materialized": 'incremental',
"unique_key": 'HASH_FIELD',
})
}}
SELECT
S.HASH_FIELD,
S.NAT_K1,
S.NAT_K2,
S.PRODUCT,
S.SUPPLIER,
S.COMPONENT,
{% if is_incremental() %}
IFNULL(T.LOAD_TIME, CURRENT_TIMESTAMP()) LOAD_TIME
{% else %}
CURRENT_TIMESTAMP() LOAD_TIME
{% endif %}
FROM {{ ref('source') }} S
{% if is_incremental() %}
LEFT JOIN {{ this }} T ON S.HASH_FIELD = T.HASH_FIELD
{% endif %}
Option 2: Use not exists
{{
config({
"materialized": 'incremental',
"unique_key": 'HASH_FIELD',
})
}}
SELECT
S.HASH_FIELD,
S.NAT_K1,
S.NAT_K2,
S.PRODUCT,
S.SUPPLIER,
S.COMPONENT,
CURRENT_TIMESTAMP() LOAD_TIME
FROM {{ ref('source') }} S
{% if is_incremental() %}
WHERE NOT EXISTS (SELECT 1 FROM {{ this }} WHERE HASH_FIELD = S.HASH_FIELD)
{% endif %}
Use option 1 if you need to change other fields that may have updated but you don't want to change the load time. Use option 2 if you want to only load new records.
EDIT: I assume the timestamp you're looking to use is not available in the source table. However, if it is, then you just want to use the common strategy for incremental models which is to look for the last timestamp of the load in the source table. Something like this:
SELECT ...
FROM {{ ref('source') }} S
{% if is_incremental() %}
WHERE S.LOAD_TIME > (SELECT MAX(LOAD_TIME) FROM {{ this }})
{% endif %}