Search code examples
mysqlapache-kafkaapache-kafka-connectdebeziumcdc

Capture Data Change with Debezium but only take change records


I have two databases (db1 and db2). Those two are having 4 tables each. The data of 4 tables are same in both the databases. Now I want to run CDC in such a way that will only take the change data and update in db2. I don't want to pull all the data from db1 into kafka topics; Because for the first time it is pulling all data into kafka topic. What will be the configuration for Debezium source connector?

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
    {
        "name": "mysql5-source",
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "10",
            "database.hostname": "host",
            "database.port": "3307",
            "database.user": "root",
            "database.password": "secret",
            "database.server.id": "11",
            "database.server.name": "dbserver",
            "database.whitelist": "dbname",
            "table.whitelist": "dbname.exm1,dbname.exm4,dbname.exm2,dbname.exm3",
            "database.history.kafka.bootstrap.servers": "kafka:29092",
            "database.history.kafka.topic": "mysql5table",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "key.converter.schemas.enable":true,
            "value.converter.schemas.enable":true,
            "transforms": "unwrap,dropTopicPrefix,pushed_on,first_transfer_date,mem_dob,pushed_date,AL_Date,A_Last_login,live_time,A_Date,callMeDate",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
            "transforms.dropTopicPrefix.replacement":"$1"
  }
}'

I am using this connector now. But it is pulling every data first time. I only need the new records. Thanks in advance!


Solution

  • I think what you're looking for is

    snapshot.mode=schema_only
    

    https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-connector-properties