Search code examples
sqlsnowflake-cloud-data-platformetl

Snowflake SQL to Extract Data from snowflake.account_usage.copy_history view as Delta Load


I have a requirement where I'm trying to extract all the snowflake's copy history data from snowflake.account_usage.copy_history view as a delta load with a control table to store the current execution date and doing a delta based on last_load_time from the Snowflake view.

But the merge query is not supported with cte's. I can just select which I need to although. So only the select is working but I'm not able to update my control table with the latest timestamp

   WITH LastLoad AS (
    SELECT
        MAX(LAST_TIMESTAMP) AS last_load_time
    FROM
        edw_dev.meta.adf_load_control
    where
        source_entity = 'COPY_HISTORY'
),
CopyHistoryDelta AS (
    SELECT
        ch.*,
        CURRENT_TIMESTAMP AS updated_load_time
    FROM
        snowflake.account_usage.copy_history ch
        INNER JOIN LastLoad ll ON ch.LAST_LOAD_TIME > ll.last_load_time
) 
MERGE INTO edw_dev.meta.adf_load_control ut 
USING (
    SELECT
        updated_load_time
    FROM
        CopyHistoryDelta
) AS new_data 
ON ut.source_entity = 'COPY_HISTORY'
WHEN MATCHED 
    THEN
        UPDATE
        SET
            LAST_TIMESTAMP = new_data.updated_load_time
WHEN NOT MATCHED 
    THEN
        INSERT
            (last_load_time)
        VALUES
            (new_data.updated_load_time);

Sorry the formatting seems to be not working


Solution

  • In snowflake merge you have to use cte inside using clause itself like below,

    MERGE INTO edw_dev.meta.adf_load_control ut 
    USING (
        WITH LastLoad AS (
            SELECT
                MAX(LAST_TIMESTAMP) AS last_load_time
            FROM
                edw_dev.meta.adf_load_control
            where
                source_entity = 'COPY_HISTORY'
        ),
        CopyHistoryDelta AS (
            SELECT
                ch.*,
                CURRENT_TIMESTAMP AS updated_load_time
            FROM
                snowflake.account_usage.copy_history ch
                INNER JOIN LastLoad ll ON ch.LAST_LOAD_TIME > ll.last_load_time
        ) 
        SELECT
            updated_load_time
        FROM
            CopyHistoryDelta
    ) AS new_data 
    ON ut.source_entity = 'COPY_HISTORY'
    WHEN MATCHED 
        THEN
            UPDATE
            SET
                LAST_TIMESTAMP = new_data.updated_load_time
    WHEN NOT MATCHED 
        THEN
            INSERT
                (last_load_time)
            VALUES
                (new_data.updated_load_time);