Search code examples
ksqldb

How to manipulate Kafka key documents with KSQLDB?


I have a problem. I can't find a way to create a stream by filtering on the key of a kafka document.

I would like to filter and manipulate the json of a kafka key to retrieve the payload of the following example which corresponds to my couchbase id:

ksql> print 'cb_bench_products-get_purge' limit 1;

Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING

rowtime: 2022/03/04 10:49:43.643 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {[...]}}

Solution

  • You didn't specify the value part of your message so I've mocked up some data and assumed that it's also JSON. First I load it into a topic to test again:

    $ kcat -b localhost:9092 -t test -P -K!
    {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}!{"col1":"foo","col2":"bar","col3":42}
    ^D
    

    Check the data in the topic using ksqlDB (0.23.1-rc9):

    ksql> print 'test' from beginning;
    Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 2022/03/04 14:14:01.539 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {"col1":"foo","col2":"bar","col3":42}, partition: 0
    

    Declare a stream over the topic, using a STRUCT to represent the nested JSON. I'm assuming you are not interested in the schema so I've omitted that.

    CREATE STREAM my_test (
      my_key_col STRUCT < payload VARCHAR > KEY,
      col1 VARCHAR,
      col2 VARCHAR,
      col3 INT
    ) WITH (KAFKA_TOPIC = 'test', FORMAT = 'JSON');
    

    Query the stream with a predicate on the key:

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT my_key_col->payload, col1, col2, col3
      FROM my_test
     WHERE my_key_col->payload LIKE 'history%'
    EMIT CHANGES;
    
    +--------------------------------------------+-------+------+-------+
    |PAYLOAD                                     |COL1   |COL2  |COL3   |
    +--------------------------------------------+-------+------+-------+
    |history::05000228023411_RO_RO11219082::80   |foo    |bar   |42     |