Search code examples
apache-kafkaapache-flinkflink-streamingflink-sql

Efficient Flink SQL for SUM for last 24 hours and previous to last 24 hours over CDC events?


I have a Kafka source with CDC (debezium) events, I tried various aggregation functions (TVF Hop Window, Over Aggregation, Group By Hop Windows, etc) but each have their own problems, either not support over CDC data, or not getting updated as new events come in.

The current approach that is working at least, but has lots of overhead (redundant HOP calculations) is this:


CREATE VIEW last_1_day_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeSwaps,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as window_time
        FROM
            swaps_stream_day
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '24' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '24' HOUR)
    )
) WHERE rn = 1;

CREATE VIEW prev_1_day_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeSwaps,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '10' MINUTE, INTERVAL '24' HOUR) as window_time
        FROM
            swaps_stream_day
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '48' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '10' MINUTE, INTERVAL '24' HOUR)
    )
) WHERE rn = 6;

This means on initial deployment we'll have AGG nodes that are super busy just because they are calculating irrelevant HOPs. Is there a more efficient way (in terms of memory usage + initial scan time) to do this in Flink SQL?

I used these other alternatives with no luck:

  • CUMULATE / TUMBLE windowing: We want last 24 hours rolling stats (and not tumble buckets or always starting from 00:00).
  • OVER AGG: It wouldn't work with CDC (update/delete) streams, even if I force to use append-only stream the data was not getting updated if no new events come in + it wouldn't subtract when events fall out of the last 24 hour window (as suggested in How can I get the moving sum of streaming events?).
  • Basic WHERE: As I understand if we just do a SUM(amount) ... WHERE > 24 HOUR AGO it wouldn't subtract when events go out of the window.

Solution

  • FLINK-20281: Window aggregation supports changelog input stream will be included in Flink 1.19. This will address your use case directly.

    This feature is in the master branch now, but won't be released for a few weeks, until the release is finalized.

    I can't really think of a good workaround.

    https://issues.apache.org/jira/browse/FLINK-20281