I am using Debezium on Mysql table to capture changelogs to Kafka with below kafka connect configuration:
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "xxxx",
"database.password": "xxxx",
"database.server.id": "42",
"database.server.name": "xxxx",
"table.whitelist": "demo.movies",
"database.history.kafka.bootstrap.servers": "broker:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
However it is sending all old records from the table to Kafka topic.
Is there any way to read only new changelog data?
The default behavior is to snapshot the table (take all existing data), then read new data.
To only read new data, you need to add "snapshot.mode" : "schema_only"