Search code examples
mysqlapache-kafkaapache-kafka-connectdebezium

How to tell debezuim Mysql source connector to stop retaking snapshots of existing tables in kafka topic?


I'm using the Debezium MySQL CDC source connector to move a database from mysql to Kafka. The connector is working fine except for the snapshots where it's acting weird; the connector took the first snapshots successfully then after few hours went down for some heap memory limit (This is not the problem). I paused the connector, stoped the worker on the cluster, fixed the issue then started the worker again... The connector is now running fine but taking snapshots again! it looks like the connector is not resuming from where it left off. and I think something is wrong in my configs. I'm using debezium 0.95.

I changed the snapshot.mode=initial to initial_only but it didn't work.

Connect properties:

{
  "properties": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "snapshot.locking.mode": "minimal",
    "errors.log.include.messages": "false",
    "table.blacklist": "mydb.someTable",
    "include.schema.changes": "true",
    "database.jdbc.driver": "com.mysql.cj.jdbc.Driver",
    "database.history.kafka.recovery.poll.interval.ms": "100",
    "poll.interval.ms": "500",
    "heartbeat.topics.prefix": "__debezium-heartbeat",
    "binlog.buffer.size": "0",
    "errors.log.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "snapshot.fetch.size": "100000",
    "errors.retry.timeout": "0",
    "database.user": "kafka_readonly",
    "database.history.kafka.bootstrap.servers": "bootstrap:9092",
    "internal.database.history.ddl.filter": "DROP TEMPORARY TABLE IF EXISTS .+ /\\* generated by server \\*/,INSERT INTO mysql.rds_heartbeat2\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value \u003d .*,FLUSH RELAY LOGS.*,flush relay logs.*",
    "heartbeat.interval.ms": "0",
    "header.converter": "org.apache.kafka.connect.json.JsonConverter",
    "autoReconnect": "true",
    "inconsistent.schema.handling.mode": "fail",
    "enable.time.adjuster": "true",
    "gtid.new.channel.position": "latest",
    "ddl.parser.mode": "antlr",
    "database.password": "pw",
    "name": "mysql-cdc-replication",
    "errors.tolerance": "none",
    "database.history.store.only.monitored.tables.ddl": "false",
    "gtid.source.filter.dml.events": "true",
    "max.batch.size": "2048",
    "connect.keep.alive": "true",
    "database.history": "io.debezium.relational.history.KafkaDatabaseHistory",
    "snapshot.mode": "initial_only",
    "connect.timeout.ms": "30000",
    "max.queue.size": "8192",
    "tasks.max": "1",
    "database.history.kafka.topic": "history-topic",
    "snapshot.delay.ms": "0",
    "database.history.kafka.recovery.attempts": "100",
    "tombstones.on.delete": "true",
    "decimal.handling.mode": "double",
    "snapshot.new.tables": "parallel",
    "database.history.skip.unparseable.ddl": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "table.ignore.builtin": "true",
    "database.whitelist": "mydb",
    "bigint.unsigned.handling.mode": "long",
    "database.server.id": "6022",
    "event.deserialization.failure.handling.mode": "fail",
    "time.precision.mode": "adaptive_time_microseconds",
    "errors.retry.delay.max.ms": "60000",
    "database.server.name": "host",
    "database.port": "3306",
    "database.ssl.mode": "disabled",
    "database.serverTimezone": "UTC",
    "task.class": "io.debezium.connector.mysql.MySqlConnectorTask",
    "database.hostname": "host",
    "database.server.id.offset": "10000",
    "connect.keep.alive.interval.ms": "60000",
    "include.query": "false"
  }
}

Solution

  • I can confirm Gunnar's answer above. Ran into some issues during snapshotting, and had to restart the whole snapshotting process. Right now, the connector does not support resuming snapshot at a certain point. Your configs seems fine to me. Hope this helps.