CREATE TABLE server_logs_v1_k (
userid STRING,
log_time TIMESTAMP_LTZ(3),
WATERMARK FOR log_time AS log_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic-pattern' = 'server_logs_v1',
'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
'properties.group.id' = 'serverlogs_v1_local_cg_1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
I have the above table and I am trying to this query
select userid, count(userid) from server_logs_v1_k group by userid,session(log_time, INTERVAL '10' SECOND);
I have 3 message in kafka topic server_logs_v1
{"userid": "1","log_time": "2023-07-07T16:43:00.000Z"}
{"userid": "2","log_time": "2023-07-07T16:43:00.000Z"}
{"userid": "3","log_time": "2023-07-07T16:43:00.000Z"}
I am NOT seeing result for
select userid, count(userid) from server_logs_v1_k group by userid,session(log_time, INTERVAL '10' SECOND);
but if i remove the session window function and use select userid, count(userid) from server_logs_v1_k group by userid
i see result
Flink version is 1.16.0
How to do I get session
window aggregation to work with kafka connector.
Your Kafka topic doesn't contain messages with a log_time
that is later then 2023-07-07T16:43:00.000Z
. Because that's not happening, Flink's watermark is still set to that value and it doesn't know that the session window has to be closed.
You can either define an idle watermark strategy, or you need to include events that have times that will push the watermark further.