Search code examples
apache-flinkflink-sql

Using flink sql kafka connector running window aggregation


CREATE TABLE server_logs_v1_k ( 
    userid STRING, 
    log_time TIMESTAMP_LTZ(3),
    WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'server_logs_v1',
'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
'properties.group.id' = 'serverlogs_v1_local_cg_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);

I have the above table and I am trying to this query

select userid, count(userid) from server_logs_v1_k group by userid,session(log_time, INTERVAL '10' SECOND);

I have 3 message in kafka topic server_logs_v1

{"userid": "1","log_time": "2023-07-07T16:43:00.000Z"}
{"userid": "2","log_time": "2023-07-07T16:43:00.000Z"}
{"userid": "3","log_time": "2023-07-07T16:43:00.000Z"}

I am NOT seeing result for select userid, count(userid) from server_logs_v1_k group by userid,session(log_time, INTERVAL '10' SECOND);

but if i remove the session window function and use select userid, count(userid) from server_logs_v1_k group by userid i see result

Flink version is 1.16.0

How to do I get session window aggregation to work with kafka connector.


Solution

  • Your Kafka topic doesn't contain messages with a log_time that is later then 2023-07-07T16:43:00.000Z. Because that's not happening, Flink's watermark is still set to that value and it doesn't know that the session window has to be closed.

    You can either define an idle watermark strategy, or you need to include events that have times that will push the watermark further.