I'm trying to add message with key to Kafka topic (running locally on DOCKER). I don't know why, JSON serializer works for value but not for key.
Creating the topic and stream in KSQLDB CLI using the command:
CREATE STREAM IF NOT EXISTS users_stream (id VARCHAR KEY, employeeId BIGINT) WITH (KAFKA_TOPIC='users_topic', KEY_FORMAT='JSON', VALUE_FORMAT='JSON', PARTITIONS=2);
After 'SHOW STREAMS;' command: Show streams
Add message by kafka-console-producer using command:
docker-compose exec broker kafka-console-producer --broker-list localhost:29092 --property parse.key=true --property key.separator="&" --property key.serializer=custom.class.serialization.JsonSerializer --property value.serializer=custom.class.serialization.JsonSerializer --topic users_topic
And add message:
{"id":"COLE888"}&{"employeeId":"1470258"}
After 'SELECT * FROM user_stream;' column 'id' contains JSON String. select * from streams
What should I change to get only value at column "ID" which is the key?
If you only want one column, then select it
SELECT id FROM users_stream
When you say "at, or within, a column", you have just a varchar, so it is not "a column". If that's your intention, use STRUCT
types.
If you're asking how to unwrap the data, then you will either need to use json extraction functions, or just don't produce JSON data as your key.