I am using the Snowflake Kafka Sink Connector to ingest data from Debezium into a Snowflake table. I have created a Stream and a Task on this table. As the data from Kafka lands into the source table, the stream gets populated and the task runs a MERGE
command to write the data into a final table.
However, as the stream has grown moderately large with about 50 million rows, the task fails to run to completion and times out.
To address this, I have tried the following:
The task still doesn't finish after 24 hours and times out.
Is it the case that ingesting 50M rows requires an even larger warehouse to ingest these rows? How do I get the task to run to completion?
MERGE
statementMERGE INTO TARGET.MESSAGE AS P
USING (SELECT RECORD_CONTENT:payload:before.id::VARCHAR AS BEFORE_ID,
RECORD_CONTENT:payload:before.agency_id::VARCHAR AS BEFORE_AGENCY_ID,
RECORD_CONTENT:payload:after.id::VARCHAR AS AFTER_ID,
RECORD_CONTENT:payload:after.agency_id::VARCHAR AS AFTER_AGENCY_ID,
RECORD_CONTENT:payload:after::VARIANT AS PAYLOAD,
RECORD_CONTENT:payload:source.ts_ms::INT AS TS_MS,
RECORD_CONTENT:payload:op::VARCHAR AS OP
FROM RAW.MESSAGE_STREAM
QUALIFY ROW_NUMBER() OVER (
PARTITION BY COALESCE(AFTER_ID, BEFORE_ID), COALESCE(AFTER_AGENCY_ID, BEFORE_AGENCY_ID)
ORDER BY TS_MS DESC
) = 1) PS ON (P.ID = PS.AFTER_ID AND P.AGENCY_ID = PS.AFTER_AGENCY_ID) OR
(P.ID = PS.BEFORE_ID AND P.AGENCY_ID = PS.BEFORE_AGENCY_ID)
WHEN MATCHED AND PS.OP = 'd' THEN DELETE
WHEN MATCHED AND PS.OP IN ('u', 'r') THEN UPDATE SET P.PAYLOAD = PS.PAYLOAD, P.TS_MS = PS.TS_MS
WHEN NOT MATCHED AND PS.OP IN ('c', 'r', 'u') THEN INSERT (P.ID, P.AGENCY_ID, P.PAYLOAD, P.TS_MS) VALUES (PS.AFTER_ID, PS.AFTER_AGENCY_ID, PS.PAYLOAD, PS.TS_MS);
EXPLAIN
PlanGlobalStats:
partitionsTotal=742
partitionsAssigned=742
bytesAssigned=3596441600
Operations:
1:0 ->Result number of rows inserted, number of rows updated, number of rows deleted
1:1 ->WindowFunction ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST)
1:2 ->LeftOuterJoin joinFilter: ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id'))))) OR ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id')))))
1:3 ->Filter ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST) = 1
1:4 ->UnionAll
1:5 ->Filter CHANGES.A_METADATA$ACTION IS NOT NULL
1:6 ->WithReference
1:7 ->WithClause CHANGES
1:8 ->Filter (A.METADATA$SHORTNAME IS NULL) OR (D.METADATA$SHORTNAME IS NULL) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_METADATA, SCAN_FDN_FILES.RECORD_METADATA))) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_CONTENT, SCAN_FDN_FILES.RECORD_CONTENT)))
1:9 ->FullOuterJoin joinKey: (D.METADATA$ROW_ID = A.METADATA$ROW_ID) AND (D.METADATA$SHORTNAME = A.METADATA$SHORTNAME)
1:10 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=17, partitionsAssigned=17, bytesAssigned=20623360}
1:11 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=507, partitionsAssigned=507, bytesAssigned=3519694336}
1:12 ->Filter CHANGES.D_METADATA$ACTION IS NOT NULL
1:13 ->WithReference
1:14 ->TableScan DATABASE.TARGET.MESSAGE as P ID, AGENCY_ID {partitionsTotal=218, partitionsAssigned=218, bytesAssigned=56123904}
I have re-jiggled you SQL just so it's more readable to me..
MERGE INTO target.message AS p
USING (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
) AS ps
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
I don't see anything super horrible here. I push the COALESCE of the two values used in the QUALIFY into the SELECT just so I can read it simpler.
But looking the ON logic, you are prepared to match before's if after's don't match, but mixing that with the COALESCE logic are the two after values both null at the same time? aka if after_id
is null, after_agency_id
will also be null. Because then if ALSO you don't want to care about check "before" if the "after" are not null but don't match. Then you could use:
ON p.id = ps.id_a AND p.agency_id = ps.id_b
albeit, you might want to name them better then. That should improve it a simdge.
Back to the JOIN logic, another reason I think the above might apply is you are grouping/partitioning the ROW_NUMBER by the after values if present, which implies if you had values with the same after values, and different before values, due to the current ROW_NUMBER the later might be getting thrown away.
But otherwise it doesn't look like it's doing anything "truely bad" at which point you might want to run a 4-8 times bigger warehouse, and let it for 24/8 hours and see if it completes in 10% extra time. The cost of the bigger warehouse should be offset in the small real clock time.
on the smaller data set you mention, try the SQL made real nice and simple:
MERGE INTO target.message AS p
USING (
(
SELECT
b.before_id,
b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
FROM (
SELECT
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
FROM raw.message_stream
` ) as A
) AS B
WHERE b.rn = 1
) AS ps
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
and with the joins as I suspect would work for you data.. on cloned tables, just to see how the performance impact is:
MERGE INTO target.message AS p
USING (
(
SELECT
--b.before_id,
--b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
b.id_a,
b.id_b
FROM (
SELECT
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
FROM raw.message_stream
` ) as A
) AS B
WHERE b.rn = 1
) AS ps
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
Split the task into to steps, just for now, make a temp table that is the first half:
CREATE TABLE perm_but_call_temp_table AS
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
then merge that into your main table.
MERGE INTO target.message AS p
USING perm_but_call_temp_table AS ps
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
which will give you an idea "where the problem is" the first or the second operation. and it will also let you merge into clones, and test if the equi join version runs faster, and the results are the same.