Search code examples
apache-kafkaksqldb

How to copy and transform all messages from one kafka topic (in avro format) to another topic (in json format)


My team is using Kafka Confluent (enterprise version) and we are very new to kafka.
We have 2 topic A and B.

Topic A will receive all json messages and formatted by Avro Schema (using URL to schema registry).

By some reasons, The development tool we are using does not support receiving messages from topic A in avro format. We create Topic B and want to use KsqlDB to copy all messages from topic A to topic B and also transform all messages from avro format to normal json format so that we can develop a component that can pick up json messages from topic B.

Could you please show me code to create ksql stream to do that.


Solution

  • Register the inbound Avro data stream

    CREATE STREAM MY_AVRO_SOURCE
      WITH (KAFKA_TOPIC='my_source_topic', FORMAT='AVRO');
    

    Tell ksqlDB to read all messages from the beginning of the topic

    SET 'auto.offset.reset' = 'earliest';
    

    Write all the messages to a new stream in JSON

    CREATE STREAM MY_JSON_TARGET 
      WITH (FORMAT='JSON') 
      AS SELECT * FROM MY_AVRO_SOURCE;
    

    By default this will populate a target topic called MY_JSON_TARGET; you can specify KAFKA_TOPIC in the WITH clause if you want to use a different target topic name.

    If only the value (not key) in the source is Avro then use VALUE_FORMAT instead of FORMAT in the WHERE clause.

    Ref: https://docs.ksqldb.io/en/latest/reference/serialization/