This is the configuration of schema-registry.properties
listeners=http://10.X.X.76:8081
kafkastore.bootstrap.servers=PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXXX.1:9092,PLAINTEXT://1XXXX.69:9092
kafkastore.topic=_schemas
debug=false
master.eligibility=true
This is the configuration of my connector,
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "minimal",
"database.user": "cdc_user",
"tasks.max": "3",
"database.history.kafka.bootstrap.servers": "10.49.115.249:9092,10.48.130.211:9092,10.54.178.121:9092,10.53.4.69:9092",
"database.history.kafka.topic": "history.cdc.fkw.supply.mp.seller_facility",
"database.server.name": "cdc.fkw.supply.mp",
"heartbeat.interval.ms": "5000",
"database.port": "3306",
"table.whitelist": "seller_facility.addresses, seller_facility.location, seller_facility.default_location, seller_facility.location_document_mapping",
"database.hostname": "dog-rr.ffb-supply-ffb-supply-mp.prod.altair.fkcloud.in",
"database.password": "6X5DpJrVzI",
"database.history.kafka.recovery.poll.interval.ms": "5000",
"name": "cdc.fkw.supply.mp.seller_facility.connector",
"database.history.skip.unparseable.ddl": "true",
"errors.tolerance": "all",
"database.whitelist": "seller_facility",
"snapshot.mode": "when_needed"
}
How do I register a new schema when there is any change in the schema? What property can I add to do so, so that it just adds a new version to schema registry for that particular topic and is fully compatible.
Assuming your key
/value.converter
are using one of the Confluent ones, such as AvroConverter
for example, any new/removed database columns will automatically be picked up by the Connect framework, and registered to the Registry as part of the serialization in the KafkaAvroSerializer
process.
Changing the database column types might generate errors, for example, changing VARCHAR
to INT