Search code examples
apache-kafkakcat

How copy some message from one kafka topic to another from bash?


pls help

We have 2 kafka topic. I want copy 10 message from beginning from topic1 to topic2.

I`m try do it with kafka-console-consumer and kafka-console-producer

First i save 10 message from topic1 to some directory:

for (( i=1; i<=10; i++ )); do bin/kafka-console-consumer.sh --bootstrap-server 1.1.2.3:9092 --group CONSUMER1 --topic TOPIC1 --max-messages 1 > /tmp/_topic/$i.msg; done;

then i try with kafka-console-producer send it to topic2:

for (( i=1; i<=10; i++ )); do  bin/kafka-console-producer.sh --broker-list 1.1.2.4:9092 --topic TOPIC2 < /tmp/_topic/$i.msg; done;

And i got error - my service Can't deserialize data. My question is:

  1. does my solution will work ?
  2. Why i can reciev this error ?
  3. What is best way to copy message from one topic to another once ?

UPD: How i`m solve this problem (thanks: Robin Moffatt): I using kafka-mirror and this jar : https://github.com/opencore/mirrormaker_topic_rename with this i can copy message from one topic kafka to another on one cluster


Solution

  • You can do this with kafkacat:

    kafkacat -b localhost:9092 -C -t source-topic -K: -e -o beginning -c10 | \
    kafkacat -b localhost:9092 -P -t target-topic -K: 
    
    • | redirects the output of the first kafkacat (which is a -C consumer) into the input of the second kafkacat (which is a -P producer)
    • -c10 means just consume 10 messages
    • -o beginning means start at the beginning of the topic.

    Note that this won't work if you've got binary data (e.g. Avro). To properly do this use something like Replicator or MirrorMaker2 and a ByteArrayConverter.