I have CDC data coming in json format from an Oracle system into S3 Bucket that looks like this for inserts:
{
"operation": "I",
"position": "00000000000010911785",
"foo": 1,
"bar": 678594,
"cat_date": "2019-10-15 21:50:31.000000000",
"dogs": 1388,
"elephants": 72
}
But when there is an update, only the updated columns + PK + CDC metadata will be included on the record:
{
"operation": "U",
"position": "00000000000010911999",
"bar": 678594,
"dogs": 2500,
"elephants": null
}
I am struggling to find a way to update these records in Snowflake because not only is there the possibility of a null record getting updated with a value and vice-versa, a value getting updated to null, but not all fields are included in the updated record, so there is not a good way of performing the updates without literally looping through every single record, in order of the position field, dynamically selecting and performing a merge for the columns that are included in the file.
MERGE INTO db.schema.target t
USING src_db.src_schema.src_tbl s ON t.bar = s.bar
WHEN MATCHED and s.operation = 'U' THEN UPDATE SET
{dynamic list of columns}
Is there a better way to do this?
The only real way to handle this scenario was by batching the records. We ended up having to create batches off the unique key for the table, ordering by the position field provided in the cdc record:
SELECT *, ROW_NUMBER() OVER (PARTITION BY PRIMARY_KEY order by POSITION asc) AS BATCHID
FROM TABLE_STREAM
Creating a temporary table off of that batch creation query (so the stream doesn't flush yet)
CREATE OR REPLACE TEMPORARY TABLE TEMP_TABLE
AS
SELECT *
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
Then processing the different operations in order of insert, updates, then deletes:
--INSERTS
INSERT INTO TABLE_NAME
SELECT COLUMNS
FROM TABLE_STREAM SRC
WHERE OPERATION = 'I'
AND BATCHID = BATCH_COUNTER
--UPDATES
MERGE INTO TABLE_NAME t
USING (
SELECT *
FROM TABLE_STREAM
WHERE OPERATION = 'U' AND BATCHID = BATCH_COUNTER
) s
ON JOIN_CONDITION
WHEN MATCHED THEN UPDATE SET
UPDATE_FIELD_LIST
--DELETES
DELETE FROM TABLE_NAME T
USING (
SELECT * FROM TABLE_STREAM
WHERE OPERATION = 'D' AND BATCHID = BATCH_COUNTER
) AS s
WHERE JOIN_CONDITION
Then flushing the stream at the end once everything has been processed, or if there was an error. If the stream was flushed at the beginning, if there was an error, then you would have to restart the whole process over again.
--STREAM FLUSH
CREATE OR REPLACE TEMPORARY TABLE TEMP_FLUSH_TABLE_STREAM
AS
SELECT *
FROM TABLE_STREAM LIMIT 1