I am new to KSQL and I am trying to get the counts of the events in a topic group per hour. If not I would settle for counting the events in the topic. Then I could change the query to work in a windowing basis. The timestamp is the To give more context let's assume my topic is called messenger and the events are in JSON format. And here is a sample message:
{"name":"Won","message":"This message is from Won","ets":1642703358124}
Partition:0 Offset:69 Timestamp:1642703359427
First create a stream over the topic:
CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR)
WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
Then use a TUMBLING
window aggregation and a dummy GROUP BY
field:
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London')
AS WINDOW_START_TS,
COUNT(*) AS RECORD_CT
FROM my_stream
WINDOW TUMBLING (SIZE 1 HOURS)
GROUP BY 1
EMIT CHANGES;
If you want to override the timestamp being picked up from the message timestamp and use a custom timestamp field (I can see ets
in your sample) you would do that in the stream definition:
CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR, ETS BIGINT)
WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON', TIMESTAMP='ets');
Ref my blog: https://rmoff.net/2020/09/08/counting-the-number-of-messages-in-a-kafka-topic/