My team is using Kafka Confluent (enterprise version) and we are very new to kafka.
We have 2 topic A and B.
Topic A will receive all json messages and formatted by Avro Schema (using URL to schema registry).
By some reasons, The development tool we are using does not support receiving messages from topic A in avro format. We create Topic B and want to use KsqlDB to copy all messages from topic A to topic B and also transform all messages from avro format to normal json format so that we can develop a component that can pick up json messages from topic B.
Could you please show me code to create ksql stream to do that.
Register the inbound Avro data stream
CREATE STREAM MY_AVRO_SOURCE
WITH (KAFKA_TOPIC='my_source_topic', FORMAT='AVRO');
Tell ksqlDB to read all messages from the beginning of the topic
SET 'auto.offset.reset' = 'earliest';
Write all the messages to a new stream in JSON
CREATE STREAM MY_JSON_TARGET
WITH (FORMAT='JSON')
AS SELECT * FROM MY_AVRO_SOURCE;
By default this will populate a target topic called MY_JSON_TARGET
; you can specify KAFKA_TOPIC
in the WITH
clause if you want to use a different target topic name.
If only the value (not key) in the source is Avro then use VALUE_FORMAT
instead of FORMAT
in the WHERE
clause.
Ref: https://docs.ksqldb.io/en/latest/reference/serialization/