Search code examples
snowflake-cloud-data-platformdebeziumsnowflake-connector

How to ingest a large Stream table using Snowflake Task?


I am using the Snowflake Kafka Sink Connector to ingest data from Debezium into a Snowflake table. I have created a Stream and a Task on this table. As the data from Kafka lands into the source table, the stream gets populated and the task runs a MERGE command to write the data into a final table.

However, as the stream has grown moderately large with about 50 million rows, the task fails to run to completion and times out.

To address this, I have tried the following:

  1. Increase the timeout of the task from 1 hour to 24 hours.
  2. Increase the warehouse size to Medium.

The task still doesn't finish after 24 hours and times out.

Is it the case that ingesting 50M rows requires an even larger warehouse to ingest these rows? How do I get the task to run to completion?

MERGE statement

MERGE INTO TARGET.MESSAGE AS P
        USING (SELECT RECORD_CONTENT:payload:before.id::VARCHAR          AS BEFORE_ID,
                      RECORD_CONTENT:payload:before.agency_id::VARCHAR   AS BEFORE_AGENCY_ID,
                      RECORD_CONTENT:payload:after.id::VARCHAR           AS AFTER_ID,
                      RECORD_CONTENT:payload:after.agency_id::VARCHAR    AS AFTER_AGENCY_ID,
                      RECORD_CONTENT:payload:after::VARIANT              AS PAYLOAD,
                      RECORD_CONTENT:payload:source.ts_ms::INT           AS TS_MS,
                      RECORD_CONTENT:payload:op::VARCHAR                 AS OP
               FROM RAW.MESSAGE_STREAM
                   QUALIFY ROW_NUMBER() OVER (
                       PARTITION BY COALESCE(AFTER_ID, BEFORE_ID), COALESCE(AFTER_AGENCY_ID, BEFORE_AGENCY_ID)
                       ORDER BY TS_MS DESC
                       ) = 1) PS ON (P.ID = PS.AFTER_ID AND P.AGENCY_ID = PS.AFTER_AGENCY_ID) OR
                                    (P.ID = PS.BEFORE_ID AND P.AGENCY_ID = PS.BEFORE_AGENCY_ID)
        WHEN MATCHED AND PS.OP = 'd' THEN DELETE
        WHEN MATCHED AND PS.OP IN ('u', 'r') THEN UPDATE SET P.PAYLOAD = PS.PAYLOAD, P.TS_MS = PS.TS_MS
        WHEN NOT MATCHED AND PS.OP IN ('c', 'r', 'u') THEN INSERT (P.ID, P.AGENCY_ID, P.PAYLOAD, P.TS_MS) VALUES (PS.AFTER_ID, PS.AFTER_AGENCY_ID, PS.PAYLOAD, PS.TS_MS);

EXPLAIN Plan

GlobalStats:
    partitionsTotal=742
    partitionsAssigned=742
    bytesAssigned=3596441600
Operations:
1:0     ->Result  number of rows inserted, number of rows updated, number of rows deleted  
1:1          ->WindowFunction  ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST)  
1:2               ->LeftOuterJoin  joinFilter: ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id'))))) OR ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id')))))  
1:3                    ->Filter  ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST) = 1  
1:4                         ->UnionAll  
1:5                              ->Filter  CHANGES.A_METADATA$ACTION IS NOT NULL  
1:6                                   ->WithReference  
1:7                                        ->WithClause  CHANGES  
1:8                                             ->Filter  (A.METADATA$SHORTNAME IS NULL) OR (D.METADATA$SHORTNAME IS NULL) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_METADATA, SCAN_FDN_FILES.RECORD_METADATA))) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_CONTENT, SCAN_FDN_FILES.RECORD_CONTENT)))  
1:9                                                  ->FullOuterJoin  joinKey: (D.METADATA$ROW_ID = A.METADATA$ROW_ID) AND (D.METADATA$SHORTNAME = A.METADATA$SHORTNAME)  
1:10                                                       ->TableScan  DATABASE.RAW.MESSAGE as SCAN_FDN_FILES  METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER  {partitionsTotal=17, partitionsAssigned=17, bytesAssigned=20623360}
1:11                                                       ->TableScan  DATABASE.RAW.MESSAGE as SCAN_FDN_FILES  METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER  {partitionsTotal=507, partitionsAssigned=507, bytesAssigned=3519694336}
1:12                              ->Filter  CHANGES.D_METADATA$ACTION IS NOT NULL  
1:13                                   ->WithReference  
1:14                    ->TableScan  DATABASE.TARGET.MESSAGE as P  ID, AGENCY_ID  {partitionsTotal=218, partitionsAssigned=218, bytesAssigned=56123904}

Query Profile

Query Profile


Solution

  • I have re-jiggled you SQL just so it's more readable to me..

    MERGE INTO target.message AS p
        USING (
        SELECT 
            record_content:payload:before.id::VARCHAR          AS before_id,
            record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
            record_content:payload:after.id::VARCHAR           AS after_id,
            record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
            record_content:payload:after::VARIANT              AS payload,
            record_content:payload:source.ts_ms::INT           AS ts_ms,
            record_content:payload:op::VARCHAR                 AS op,
            COALESCE(after_id, before_id) AS id_a
            COALESCE(after_agency_id, before_agency_id) AS id_b
        FROM raw.message_stream
        QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
    ) AS ps 
        ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
           (p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
    WHEN MATCHED AND ps.op = 'd' 
        THEN DELETE
    WHEN MATCHED AND ps.op IN ('u', 'r') 
        THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
    WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
        THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
            VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
    

    I don't see anything super horrible here. I push the COALESCE of the two values used in the QUALIFY into the SELECT just so I can read it simpler.

    But looking the ON logic, you are prepared to match before's if after's don't match, but mixing that with the COALESCE logic are the two after values both null at the same time? aka if after_id is null, after_agency_id will also be null. Because then if ALSO you don't want to care about check "before" if the "after" are not null but don't match. Then you could use:

    ON p.id = ps.id_a AND p.agency_id = ps.id_b
    

    albeit, you might want to name them better then. That should improve it a simdge.

    Back to the JOIN logic, another reason I think the above might apply is you are grouping/partitioning the ROW_NUMBER by the after values if present, which implies if you had values with the same after values, and different before values, due to the current ROW_NUMBER the later might be getting thrown away.

    But otherwise it doesn't look like it's doing anything "truely bad" at which point you might want to run a 4-8 times bigger warehouse, and let it for 24/8 hours and see if it completes in 10% extra time. The cost of the bigger warehouse should be offset in the small real clock time.

    Silly Idea:

    on the smaller data set you mention, try the SQL made real nice and simple:

    MERGE INTO target.message AS p
    USING (
        (
        SELECT 
            b.before_id,
            b.before_agency_id,
            b.after_id,
            b.after_agency_id,
            b.payload,
            b.ts_ms,
            b.op,
        FROM (
            SELECT 
                A.*
                ,COALESCE(a.after_id, a.before_id) AS id_a
                ,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
                ,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
            FROM (
                SELECT 
                    record_content:payload:before.id::VARCHAR          AS before_id,
                    record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
                    record_content:payload:after.id::VARCHAR           AS after_id,
                    record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
                    record_content:payload:after::VARIANT              AS payload,
                    record_content:payload:source.ts_ms::INT           AS ts_ms,
                    record_content:payload:op::VARCHAR                 AS op,
                FROM raw.message_stream
        `   ) as A
        ) AS B
        WHERE b.rn = 1   
    ) AS ps 
        ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
           (p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
    WHEN MATCHED AND ps.op = 'd' 
        THEN DELETE
    WHEN MATCHED AND ps.op IN ('u', 'r') 
        THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
    WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
        THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
            VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
    

    and with the joins as I suspect would work for you data.. on cloned tables, just to see how the performance impact is:

    MERGE INTO target.message AS p
    USING (
        (
        SELECT 
            --b.before_id,
            --b.before_agency_id,
            b.after_id,
            b.after_agency_id,
            b.payload,
            b.ts_ms,
            b.op,
            b.id_a,
            b.id_b
        FROM (
            SELECT 
                A.*
                ,COALESCE(a.after_id, a.before_id) AS id_a
                ,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
                ,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
            FROM (
                SELECT 
                    record_content:payload:before.id::VARCHAR          AS before_id,
                    record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
                    record_content:payload:after.id::VARCHAR           AS after_id,
                    record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
                    record_content:payload:after::VARIANT              AS payload,
                    record_content:payload:source.ts_ms::INT           AS ts_ms,
                    record_content:payload:op::VARCHAR                 AS op,
                FROM raw.message_stream
        `   ) as A
        ) AS B
        WHERE b.rn = 1   
    ) AS ps 
        ON p.id = ps.id_a AND p.agency_id = ps.id_b
    WHEN MATCHED AND ps.op = 'd' 
        THEN DELETE
    WHEN MATCHED AND ps.op IN ('u', 'r') 
        THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
    WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
        THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
            VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
    

    Another thing to try to "get thought the backlog"

    Split the task into to steps, just for now, make a temp table that is the first half:

    CREATE TABLE perm_but_call_temp_table AS
       SELECT 
            record_content:payload:before.id::VARCHAR          AS before_id,
            record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
            record_content:payload:after.id::VARCHAR           AS after_id,
            record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
            record_content:payload:after::VARIANT              AS payload,
            record_content:payload:source.ts_ms::INT           AS ts_ms,
            record_content:payload:op::VARCHAR                 AS op,
            COALESCE(after_id, before_id) AS id_a
            COALESCE(after_agency_id, before_agency_id) AS id_b
        FROM raw.message_stream
        QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
    

    then merge that into your main table.

    MERGE INTO target.message AS p
    USING perm_but_call_temp_table AS ps 
        ON p.id = ps.id_a AND p.agency_id = ps.id_b
    WHEN MATCHED AND ps.op = 'd' 
        THEN DELETE
    WHEN MATCHED AND ps.op IN ('u', 'r') 
        THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
    WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
        THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
            VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
    

    which will give you an idea "where the problem is" the first or the second operation. and it will also let you merge into clones, and test if the equi join version runs faster, and the results are the same.