Search code examples
postgresqlairflowddldbt

Moving from DTL processes to DLT processes (using DBT)


I'm trying to move from a DTL script to this a DLT DBT script. The problem is that in one of my tables I also have some other DTL processes that writes in. I read https://docs.getdbt.com/guides/migration/tools/migrating-from-stored-procedures/1-migrating-from-stored-procedures but didn't figure out how should I set my last tables to be able to write inside it. Maybe somebody else got stuck in this.

The flow is like in picture. The flow is like in picture. The int__log_client__deletes & int__log_client_do__delete have to write also in tables client_dim & audit_matching_log without deleting the table.

The original int__log_client__do_delete script segment looks something like:

update data_warehouse.client_dim
       set is_deleted = true,
       deleted_at = current_timestamp
 where client_id in (select client_id from deletes)
   and source_id = 1
   returning 'delete' as type, client_id as key

The other original one is something like this but with insert in it.

My current view is something like:

{{ config( materialized='????' ) }}
with

do_delete as
(
    select
        some_columns,
        true as to_delete,
        current_timestamp as deleted_at
    from {{  ref('stg__data_warehouse__client_dim') }}
    where client_id in (select client_id from {{ ref('int__log_client__deletes') }})
        and client_dim_source_id = 1
--    returning 'delete' as type, client_id as key
)

select * from do_delete

Thanks!


Solution

  • After hours of trial & error and getting cycle issues I got a working using a post_hook config in

    int__data_warehouse__client_dim_with_deletes.sql

    Something like:

    {{
    config(
        post_hook="update {{ source('data_warehouse','client_dim') }}
                   set is_deleted = true,
                       deleted_at = {{ dbt_date.now() }}
                   where client_id in (select client_id from {{ this }})
                       and source_id = 1"
           )
     }}
    
    with
    do_delete as
    (
        select
            some_columns
        from {{  ref('stg__data_warehouse__client_dim') }}
        where client_id in (select client_id from {{ ref('int__log_client__deletes') }})
            and client_dim_source_id = 1
    --    returning 'delete' as type, client_id as key
    )
    
    select * from do_delete
    

    That's the working solution for my need!