Search code examples
apache-kafkaavroapache-kafka-connectconfluent-schema-registry

How to configure kafka connect use Avro Schema?


I have starting to learn Avro. i want to implement it in kafka connect. I use a configuration like the following. Is this the right configuration?

{
    "name": "surveyWawancara-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "key.deserializer": "org.apache.kafka.connect.json.JsonDeserializer",
        "database.user": "Acquisition.ro",
        "database.dbname": "acquisition",
        "value.deserializer": "org.apache.kafka.connect.json.JsonDeserializer",
        "tasks.max": "1",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "database.history.kafka.bootstrap.servers": "beta-kafka-brokers.amq-streams-beta.svc:9092",
        "database.history.kafka.topic": "schema-changes.sl.surveyWawancara",
        "time.precision.mode": "connect",
        "database.server.name": "beta-sl-bn",
        "database.port": "1433",
        "table.whitelist": "dbo.SurveyWawancara",
        "key.converter.schemas.enable": "true",
        "database.hostname": "10.7.76.62",
        "database.password": "Acquisition_ro231!",
        "value.converter.schemas.enable": "true",
        "name": "surveyWawancara-connector",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Solution

  • You've duplicated the converter fields, but these properties are correct, yes

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081", 
    

    Avro always has a schema, so these *.schemas.enable do nothing and can be removed.

    Similarly, the deserializer class configs are not applicable to Connect and are encompassed by the converter configs, so should be removed as well

    Worth mentioning that the key format does not have to (and often doesn't) match the value's