Search code examples
mergesnowflake-cloud-data-platformoracle-cdc

Merging CDC Updates Into Snowflake With Varying Updated Columns


I have CDC data coming in json format from an Oracle system into S3 Bucket that looks like this for inserts:

 {
  "operation": "I",
  "position": "00000000000010911785",
  "foo": 1,
  "bar": 678594,
  "cat_date": "2019-10-15 21:50:31.000000000",
  "dogs": 1388,
  "elephants": 72
}

But when there is an update, only the updated columns + PK + CDC metadata will be included on the record:

{
  "operation": "U",
  "position": "00000000000010911999",
  "bar": 678594,
  "dogs": 2500,
  "elephants": null
}

I am struggling to find a way to update these records in Snowflake because not only is there the possibility of a null record getting updated with a value and vice-versa, a value getting updated to null, but not all fields are included in the updated record, so there is not a good way of performing the updates without literally looping through every single record, in order of the position field, dynamically selecting and performing a merge for the columns that are included in the file.

MERGE INTO db.schema.target t
    USING src_db.src_schema.src_tbl s ON t.bar = s.bar
            WHEN MATCHED and s.operation = 'U' THEN UPDATE SET
            {dynamic list of columns}

Is there a better way to do this?


Solution

  • The only real way to handle this scenario was by batching the records. We ended up having to create batches off the unique key for the table, ordering by the position field provided in the cdc record:

    SELECT *, ROW_NUMBER() OVER (PARTITION BY PRIMARY_KEY order by POSITION asc) AS BATCHID
    FROM TABLE_STREAM
    

    Creating a temporary table off of that batch creation query (so the stream doesn't flush yet)

    CREATE OR REPLACE TEMPORARY TABLE TEMP_TABLE
    AS
    SELECT *  
    FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
    

    Then processing the different operations in order of insert, updates, then deletes:

    --INSERTS
    INSERT INTO TABLE_NAME
    SELECT COLUMNS
    FROM TABLE_STREAM SRC
    WHERE OPERATION = 'I' 
    AND BATCHID = BATCH_COUNTER
    
    --UPDATES
    MERGE INTO TABLE_NAME t
    USING (
       SELECT * 
       FROM TABLE_STREAM 
       WHERE OPERATION = 'U' AND BATCHID = BATCH_COUNTER
    ) s 
    ON JOIN_CONDITION
    WHEN MATCHED THEN UPDATE SET
    UPDATE_FIELD_LIST
    
    --DELETES
    DELETE FROM TABLE_NAME T
    USING (
       SELECT * FROM TABLE_STREAM 
       WHERE OPERATION = 'D' AND BATCHID = BATCH_COUNTER
    ) AS s
    WHERE JOIN_CONDITION
    

    Then flushing the stream at the end once everything has been processed, or if there was an error. If the stream was flushed at the beginning, if there was an error, then you would have to restart the whole process over again.

    --STREAM FLUSH
    CREATE OR REPLACE TEMPORARY TABLE TEMP_FLUSH_TABLE_STREAM
    AS
    SELECT *
    FROM TABLE_STREAM LIMIT 1