Search code examples
apache-kafkaconfluent-platformksqldb

Can we select a specific row of records from a confluent kafka topic?


In my local Confluent Platform, I have 1 topic call "FOO_02", I have manually insert some records to it, thus, I can print it from beginning base on the following command:

print 'FOO_02' from beginning;

enter image description here

Can I do something like, I only want to pull the record where COL1 = 1? Something like we can execute a select statement with where condition to pull the data from normal database like db2.

I have try the following command but I believe it will only get the new data, because I am getting empty record for this command:

ksql> select * from FOO_02 WHERE COL1=1 EMIT CHANGES;

Solution

  • I'm presuming that you've already done

    CREATE STREAM FOO_02 WITH (KAFKA_TOPIC='FOO_02', FORMAT='AVRO');
    

    because otherwise your SELECT would have failed.

    So since the PRINT shows successfully that there is data in the topic you can query the stream using the predicate you want. The only thing you need to do is tell ksqlDB to process all the data in the topic and not just the new records (which is what from beginning does in the PRINT statement). To do this, run:

    SET 'auto.offset.reset' = 'earliest';
    

    and then run the SELECT.


    Edit

    Can we select the only latest records? For example, I have multiple data being push to topic for COL1=0, but I only want to grab the latest one because its the newest data and it is the only correct one. something like where rowtime = max?

    What you're describing is a TABLE: the latest value for a given key.

    CREATE TABLE FOO AS
      SELECT COL1, 
             LATEST_BY_OFFSET(COL2) AS COL2
        FROM FOO_02
       WHERE COL1=0
       GROUP BY COL1;
    

    The resulting table will have a single entry for COL1, with the latest value of COL2 as new messages are received.

    Any way to bring the old data in the Stream into the table as well?

    To process existing data too set the offset back to earliest before running the CREATE statement

    SET 'auto.offset.reset' = 'earliest';
    
    CREATE TABLE FOO AS
    […]