Search code examples
apache-kafkaapache-kafka-streamsksqldb

Get Latest value from Kafka


I have a Kafka topic called A.

format of data in topic A is :

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000}

Now in consumer side i want to get only latest data of one hour window means every one hour i need to get latest value from topic based on created_at

My expected output is :

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000}
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}

I think this can be solve by ksql but i m not sure. Please help me.

Thank in advance.


Solution

  • Yes, you can use KSQL for this. Try the following:

    CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAR) 
    WITH (kafka_topic = 'topic_name', value_format = 'JSON');
    
    CREATE TABLE maxRow AS 
    SELECT 
        id, 
        name, 
        max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) 
    AS created_at 
    FROM s1 
    WINDOW TUMBLING (size 1 hour) 
    GROUP BY id, name;
    

    The result will have the created_at time in Linux timestamp format. You can change it into your desired format using TIMESTAMPTOSTRING UDF in a new query.

    Please let me know if you find any issues.