Search code examples
apache-kafkaapache-kafka-connectconfluent-schema-registrydebezium

What property to use in kafka mysql source connector to register a new version for any schema change


This is the configuration of schema-registry.properties

listeners=http://10.X.X.76:8081
kafkastore.bootstrap.servers=PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXXX.1:9092,PLAINTEXT://1XXXX.69:9092
kafkastore.topic=_schemas
debug=false
master.eligibility=true

This is the configuration of my connector,

{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "database.user": "cdc_user",
  "tasks.max": "3",
  "database.history.kafka.bootstrap.servers": "10.49.115.249:9092,10.48.130.211:9092,10.54.178.121:9092,10.53.4.69:9092",
  "database.history.kafka.topic": "history.cdc.fkw.supply.mp.seller_facility",
  "database.server.name": "cdc.fkw.supply.mp",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "table.whitelist": "seller_facility.addresses, seller_facility.location, seller_facility.default_location, seller_facility.location_document_mapping",
  "database.hostname": "dog-rr.ffb-supply-ffb-supply-mp.prod.altair.fkcloud.in",
  "database.password": "6X5DpJrVzI",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "name": "cdc.fkw.supply.mp.seller_facility.connector",
  "database.history.skip.unparseable.ddl": "true",
  "errors.tolerance": "all",
  "database.whitelist": "seller_facility",
  "snapshot.mode": "when_needed"
}

How do I register a new schema when there is any change in the schema? What property can I add to do so, so that it just adds a new version to schema registry for that particular topic and is fully compatible.


Solution

  • Assuming your key/value.converter are using one of the Confluent ones, such as AvroConverter for example, any new/removed database columns will automatically be picked up by the Connect framework, and registered to the Registry as part of the serialization in the KafkaAvroSerializer process.

    Changing the database column types might generate errors, for example, changing VARCHAR to INT