Search code examples
mysqlapache-kafkaapache-kafka-connectdebezium

Updating a Debezium MySQL connector with table whitelist option


I'm using the Debezium (0.7.5) MySQL connector and I'm trying to understand what is the best approach if I want to update this configuration with the option table.whitelist.

Let's say I create a connector, something like this:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

After some time (2 weeks), I need to add a new table (myDb.table3) to this table.whitelist option (and this table is an old one, it was created before the connector)

What I tried was:

  • Pause the connector.
  • Deleted the history topic (maybe this was the problem?).
  • Updated the config via API update config endpoint.
  • Resume the connector.

Update command via API:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

But it didn't work and maybe this isn't the best approach at all. In other connectors I'm not using the option table.whitelist, so when I needed to listen na new table, I didn't have this problem.

My last option, I think would be delete this connector and create another one with this new configuration also listening the new table (myDb.table3). The problem is if I want the initial data from myDb.table3 I would have to create with the snapshot initial but I don't wanna to generate all the messages from the snapshot from the other tables myDb.table1,myDb.table2.


Solution

  • Changes to the whitelist/blacklist config are not yet supported at this point. This is currently being worked on (see DBZ-175), and we hope to have preview support for this in one of the next releases. There's a pending PR for this, which needs a bit more work, though.

    Until this has been implemented, your best option is to set up a new instance of the connector which only captures the additional tables you're interested in. This comes at the price of running two connectors (which both will maintain a binlog reader session), but it does the trick as long as you don't need to change your filter config too often.