Search code examples
javaapache-kafkaksqldb

How to query directly from a kafka topic?


I've looked into Interactive queries and KSQL but I can't seem to figure out if querying for a specific record(s) based on key is possible.

Say I have a record in a topic as shown:

{
  key: 12314,
  value: 
    {
       id: "1",
       name: "bob"
    }
}

Would it be possible to search for key 12314 in a topic? Also does KSQL and interactive queries consume the entire topic to do queries?


Solution

  • Assuming your value is valid JSON (i.e. the field names are also quoted) then you can do this easily with KSQL/ksqlDB:

    Examine the Kafka topic in ksqlDB:

    ksql> PRINT test3;
    Format:JSON
    1/9/20 12:11:35 PM UTC , 12314 , {"id": "1", "name": "bob"    } 
    

    Declare the stream:

    ksql> CREATE STREAM FOO (ID VARCHAR, NAME VARCHAR) 
            WITH (KAFKA_TOPIC='test3',VALUE_FORMAT='JSON');
    

    Filter the stream as data arrives

    ksql> SELECT ROWKEY, ID, NAME FROM FOO WHERE ROWKEY='12314' EMIT CHANGES;
    +----------------------------+----------------------------+----------------------------+
    |ROWKEY                      |ID                          |NAME                        |
    +----------------------------+----------------------------+----------------------------+
    |12314                       |1                           |bob                         |
    

    asciicast