Search code examples
apache-kafkaapache-kafka-connectconfluent-schema-registry

Not able to override default values of use.schema.id, auto.register.schemas in mqtt source connector kafka-connect confluent using JsonSchemaConverter


schema.registry.url = \[http://13.127.201.183:30068\] 
use.latest.version = false 
use.schema.id = -1

What is the use of use.schema.id?

Connector Configuration for Kafka-connect:

{
  "name": "TEST12",
  "config": {
    "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
    "value.converter.schema.registry.url": "http://13.127.201.183:30068",

"confluent.topic.bootstrap.servers": "aepv6-base-shell-kafka:9092",

    "auto.register.schemas": "false",
    "tasks.max": "2",
    "name": "TEST12",
    "kafka.topic": "TEST-KAFKA-12",
    "mqtt.topics": "TEST-MQTT",
    "mqtt.server.uri": "tcp://broker.hivemq.com:1883",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "confluent.license.topic.replication.factor": "1",

    "use.schema.id": "1"
  },

I tried use.schema.id=1, where 1 is the global schema id where schema is present.

http://13.127.201.183:30068/schemas/ids/1 :

"schema": "{"type":"record","name":"test","fields":

\[{"name":"field1","type":"string"},{"name":"field2","type":"int"}\]}"

I expected the topic to only allow data in adherence to this format, but it is allowing everything that comes from MQTT source to kafka


Solution

  • You need to prefix those properties with value.converter. to actually set them for the Converter, like you did for the registry address, rather than trying to apply them to the the MQTT connector config.

    What is the use of

    Source code can answer that

    allowing everything that comes from MQTT source to kafka

    I don't think the connector will validate record content. It'll simply serialize whatever it gets.