Search code examples
ksqldb

Does anybody have a KSQL query that counts event in a topic on a per hour basis?


I am new to KSQL and I am trying to get the counts of the events in a topic group per hour. If not I would settle for counting the events in the topic. Then I could change the query to work in a windowing basis. The timestamp is the To give more context let's assume my topic is called messenger and the events are in JSON format. And here is a sample message:

{"name":"Won","message":"This message is from Won","ets":1642703358124}
Partition:0 Offset:69 Timestamp:1642703359427

Solution

  • First create a stream over the topic:

    CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR)
      WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON');
    

    Then use a TUMBLING window aggregation and a dummy GROUP BY field:

    SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London')
             AS WINDOW_START_TS,
           COUNT(*) AS RECORD_CT
      FROM my_stream
            WINDOW TUMBLING (SIZE 1 HOURS)
      GROUP BY 1
      EMIT CHANGES;
    

    If you want to override the timestamp being picked up from the message timestamp and use a custom timestamp field (I can see ets in your sample) you would do that in the stream definition:

    CREATE STREAM my_stream (NAME VARCHAR, MESSAGE VARCHAR, ETS BIGINT)
      WITH (KAFKA_TOPIC='my_topic', FORMAT='JSON', TIMESTAMP='ets');
    

    Ref my blog: https://rmoff.net/2020/09/08/counting-the-number-of-messages-in-a-kafka-topic/