Search code examples
mysqlapache-kafkaapache-kafka-connectdebezium

Debezium Mysql KafkaConnect - capture only new changelog data


I am using Debezium on Mysql table to capture changelogs to Kafka with below kafka connect configuration:

{
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "xxxx",
        "database.password": "xxxx",
        "database.server.id": "42",
        "database.server.name": "xxxx",
        "table.whitelist": "demo.movies",
        "database.history.kafka.bootstrap.servers": "broker:9092",
        "database.history.kafka.topic": "dbhistory.demo" ,
        "decimal.handling.mode": "double",
        "include.schema.changes": "true",
        "transforms": "unwrap,dropTopicPrefix",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
        "transforms.dropTopicPrefix.replacement":"$1",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
}

However it is sending all old records from the table to Kafka topic.

Is there any way to read only new changelog data?


Solution

  • The default behavior is to snapshot the table (take all existing data), then read new data.

    To only read new data, you need to add "snapshot.mode" : "schema_only"