Search code examples
apache-kafkaksqldb

KSQLDB - how to add message with key to Kafka topic with using serializer JSON


I'm trying to add message with key to Kafka topic (running locally on DOCKER). I don't know why, JSON serializer works for value but not for key.

Creating the topic and stream in KSQLDB CLI using the command: CREATE STREAM IF NOT EXISTS users_stream (id VARCHAR KEY, employeeId BIGINT) WITH (KAFKA_TOPIC='users_topic', KEY_FORMAT='JSON', VALUE_FORMAT='JSON', PARTITIONS=2);

After 'SHOW STREAMS;' command: Show streams

Add message by kafka-console-producer using command: docker-compose exec broker kafka-console-producer --broker-list localhost:29092 --property parse.key=true --property key.separator="&" --property key.serializer=custom.class.serialization.JsonSerializer --property value.serializer=custom.class.serialization.JsonSerializer --topic users_topic

And add message: {"id":"COLE888"}&{"employeeId":"1470258"}

After 'SELECT * FROM user_stream;' column 'id' contains JSON String. select * from streams

What should I change to get only value at column "ID" which is the key?


Solution

  • If you only want one column, then select it

    SELECT id FROM users_stream
    

    When you say "at, or within, a column", you have just a varchar, so it is not "a column". If that's your intention, use STRUCT types.

    If you're asking how to unwrap the data, then you will either need to use json extraction functions, or just don't produce JSON data as your key.