Search code examples
apache-kafkaapache-flinkflink-streamingflink-sql

How to advance watermark in Flink SQL for a Kafka source, even if no new data in none of the partitions?


I have a simple Flink (v1.17) SQL streaming job that uses Kafka as source, I have configured a few configurations related to watermarks but I can't seem to understand how it is possible to force the watermark to advance even if no events are coming in.

The final goal is to calculate Last 1h SUM of a certain field (amount). Here are the relevant parts of the Flink job (in Java) and the SQL statements I'm running:

public class StreamingJob {

// ... 
public static void main(String[] args) throws Exception {
    // ...
    List<String> sqlStatements = ...

    final EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

    streamEnv.getConfig().setAutoWatermarkInterval(1000);
    streamEnv.getConfig().setLatencyTrackingInterval(10000);

    tableEnv.getConfig().getConfiguration().setString("pipeline.auto-watermark-interval", "5000ms");
    tableEnv.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "5000ms");
    tableEnv.getConfig().getConfiguration().setString("table.exec.emit.allow-lateness", "1d");

    for (String statement : sqlStatements) {
        tableEnv.executeSql(statement);
    }

    streamEnv.execute(jobId);
}

CREATE TABLE trades_stream_by_hour (
    `entityId` STRING,
    `poolId` STRING,
    `amountUSD` FLOAT,
    `blockNumber` BIGINT,
    `timestamp` BIGINT,
    `rowtime` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
    WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECONDS
) WITH (
    'connector' = 'kafka',
    'topic' = 'trades',
    'scan.startup.mode' = 'timestamp',
    'scan.startup.timestamp-millis' = '1705851977000', -- 2 hours ago in unix timestamp milliseconds
    'format' = 'debezium-json',
    'debezium-json.schema-include' = 'true',
    'debezium-json.map-null-key.mode' = 'DROP',
    'debezium-json.encode.decimal-as-plain-number' = 'true'
);

-- Supressed the "my_sink" CREATE TABLE (jdbc sink table) statement for brevity
-- Supressed the "pools" CREATE TABLE (jdbc lookup table) statement for brevity

CREATE VIEW last_1_hour_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeTrades,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR) as window_time
        FROM
            trades_stream_by_hour
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '1' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
    )
) WHERE rn = 1;

CREATE VIEW prev_1_hour_volumes AS
SELECT *, PROCTIME() as proc_time FROM (
    SELECT 
        *,
        ROW_NUMBER() OVER (
            PARTITION BY poolId
            ORDER BY window_time DESC
        ) AS rn
    FROM (
        SELECT
            poolId,
            SUM(COALESCE(amountUSD, 0)) AS totalVolumeUSD,
            COUNT(*) AS totalVolumeSwaps,
            MAX(`timestamp`) AS `timestamp`,
            MAX(`blockNumber`) AS `maxBlockNumber`,
            MIN(`blockNumber`) AS `minBlockNumber`,
            HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as window_time
        FROM
            trades_stream_by_hour
        WHERE
            amountUSD IS NOT NULL AND amountUSD > 0 AND `rowtime` >= CAST(
                CURRENT_TIMESTAMP - INTERVAL '2' HOUR AS TIMESTAMP(3)
            )
        GROUP BY
            poolId,
            HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
    )
) WHERE rn = 12;

INSERT INTO
    my_sink
SELECT
    COALESCE(lv1h.poolId, pv1h.poolId) as entityId,
    lv1h.totalVolumeSwaps as volumeSwaps1h,
    lv1h.minBlockNumber as volumeMinBlock1h,
    lv1h.maxBlockNumber as volumeMaxBlock1h,
    lv1h.totalVolumeUSD as volumeUSD1h,
    lv1h.`timestamp` as volumeUSDTimestamp1h,
    pv1h.totalVolumeSwaps as prevVolumeSwaps1h,
    pv1h.minBlockNumber as prevVolumeMinBlock1h,
    pv1h.maxBlockNumber as prevVolumeMaxBlock1h,
    pv1h.totalVolumeUSD as prevVolumeUSD1h,
    pv1h.`timestamp` as prevVolumeUSDTimestamp1h,
    lv1h.totalVolumeUSD - pv1h.totalVolumeUSD as volumeUSDChange1h,
    (lv1h.totalVolumeUSD - pv1h.totalVolumeUSD) / pv1h.totalVolumeUSD as volumeUSDChangePercent1h
FROM last_1_hour_volumes lv1h
LEFT JOIN prev_1_hour_volumes AS pv1h ON lv1h.poolId = pv1h.poolId
LEFT JOIN pools_store FOR SYSTEM_TIME AS OF lv1h.proc_time AS pool ON pool.entityId = lv1h.poolId
WHERE
    -- The goal here is to avoid writing any data before 1 hour ago
    (CURRENT_WATERMARK(lv1h.window_time) IS NOT NULL AND CURRENT_WATERMARK(lv1h.window_time) >= CAST(CURRENT_TIMESTAMP - INTERVAL '5' MINUTES AS TIMESTAMP(3))) OR
    (CURRENT_WATERMARK(pv1h.window_time) IS NOT NULL AND CURRENT_WATERMARK(pv1h.window_time) >= CAST(CURRENT_TIMESTAMP - INTERVAL '5' MINUTES AS TIMESTAMP(3)))

The observation is as soon as there's a new trade it will properly write the expected values. But after 20 minutes for example if there no new trades, it must update the total volumes of "last 1 hour" correctly, but watermark seems to be stuck at the last event timestamp.

I'm wondering if Kafka source is not advancing the watermark or something imporperly configured, or maybe the way my query calculates last 1 hour and prev to last 1 hour SUMs.

I have tried these configurations but none of are helping:

  • Setting setAutoWatermarkInterval() to 5s
  • Setting pipeline.auto-watermark-interval to 5s
  • Setting table.exec.source.idle-timeout to 5s
  • Setting table.exec.emit.allow-lateness to 5s
  • Setting both parallelism and partitions to 1

Solution

  • If there no events whatsoever, then it's not possible, given the built-in watermark support, to trigger the final output. The watermark is never going to advance up to the timestamp of the last event. You have a few options:

    (1) Use stop with drain to definitively stop the job(s). This will send through a watermark with a huge timestamp that will fire all pending timers and close all open windows.

    (2) Run the queries in batch mode, or use a source that supports bounded streaming, such as Kafka. See https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#scan-bounded-mode.

    (3) Using the DataStream API, implement a custom watermark strategy that uses a processing time timer to detect when all the sources have become idle, and arrange for it to advance the watermark. After converting the streams to tables, you'd then need to configure your SQL tables to use SOURCE_WATERMARK() as the SQL WATERMARK.