I have a problem. I can't find a way to create a stream by filtering on the key of a kafka document.
I would like to filter and manipulate the json of a kafka key to retrieve the payload
of the following example which corresponds to my couchbase id:
ksql> print 'cb_bench_products-get_purge' limit 1;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 10:49:43.643 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {[...]}}
You didn't specify the value
part of your message so I've mocked up some data and assumed that it's also JSON. First I load it into a topic to test again:
$ kcat -b localhost:9092 -t test -P -K!
{"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}!{"col1":"foo","col2":"bar","col3":42}
^D
Check the data in the topic using ksqlDB (0.23.1-rc9):
ksql> print 'test' from beginning;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 14:14:01.539 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {"col1":"foo","col2":"bar","col3":42}, partition: 0
Declare a stream over the topic, using a STRUCT
to represent the nested JSON. I'm assuming you are not interested in the schema
so I've omitted that.
CREATE STREAM my_test (
my_key_col STRUCT < payload VARCHAR > KEY,
col1 VARCHAR,
col2 VARCHAR,
col3 INT
) WITH (KAFKA_TOPIC = 'test', FORMAT = 'JSON');
Query the stream with a predicate on the key:
SET 'auto.offset.reset' = 'earliest';
SELECT my_key_col->payload, col1, col2, col3
FROM my_test
WHERE my_key_col->payload LIKE 'history%'
EMIT CHANGES;
+--------------------------------------------+-------+------+-------+
|PAYLOAD |COL1 |COL2 |COL3 |
+--------------------------------------------+-------+------+-------+
|history::05000228023411_RO_RO11219082::80 |foo |bar |42 |