Search code examples
apache-kafkaapache-kafka-connectdebeziumkcat

How to manipulate offsets of the source database for Debezium


So I've been experimenting with Kafka and I am trying to manipulate/change the offsets of the source database using this link https://debezium.io/documentation/faq/. I was successfully able to do it but I was wondering how I would do this in native kafka commands instead of using kafkacat.
So these are the kafka commands that I'm using

kafkacat -b kafka:9092 -C -t connect_offsets -f 'Partition(%p) %k %s\n'

and

echo '["In-house",{"server":"optimus-prime"}]|{"ts_sec":1657643280,"file":"mysql-bin.000200"","pos":2136,"row":1,"server_id":223344,"event":2}' | \
kafkacat -P -b kafka:9092 -t connect_offsets -K \| -p 2 

It basically reverts the offset of the source system back to a previous binlog and I can be able to read the db from a previous point in time. So this works well, but was wondering what I would need to compose via native kafka since we don't have kafkacat on our dev/prod servers although I do see it's value and maybe that will be installed later in the future. This is what I have so far for the transalation but it's not quite doing what I'm thinking.

./kafka-console-consumer.sh   --bootstrap-server kafka:9092   --topic 
connect_offsets   --property print.offset=true   --property print.partition=true   -- 
property print.headers=true   --property print.timestamp=true   --property 
print.key=true   --from-beginning

After I run this I get these results. enter image description here

This works well for the kafka consumer command but when I try to translate the producer command I run into issues.

./kafka-console-producer.sh   --bootstrap-server kafka:9092   --topic connect_offsets  
--property parse.partition=true    --property parse.key=true --property 
key.separator=":" 

I get a prompt after the producer command and I enter this

["In-house",{"server":"optimus-prime"}]:{"ts_sec":1657643280,"file":"mysql-bin.000200","pos":2136,"row":1,"server_id":223344,"event":2}:2

But it seems like it's not taking the command because the bin log position doesn't update after I run the consumer command again. Any ideas? Let me know.

EDIT: After applying OneCricketeer's changes I'm getting this stack trace. enter image description here


Solution

  • key.separator=":" looks like it will be an issue considering it will split your data at ["In-house",{"server":

    So, basically you produced a bad event into the topic, and maybe broke the connector...

    If you want to literally use the same command, keep your key separator as |, or any other character that will not be part of the key.

    Also, parse.partition isn't a property that is used, so you should remove :2 at the end... I'm not even sure kafka-console-producer can target a specific partition.