Search code examples
apache-kafkaapache-kafka-connectconfluent-schema-registry

value.subject.name.strategy for kafka s3 sink connector is not recognized


How do we configure value.subject.name.strategy based on https://docs.confluent.io/platform/current/schema-registry/connect.html#json-schema I put various configuration names in worker.properties but it seems that nothing is recognized by kafka sink connector. As you can see in the logs, it's always defaulted to topicNameStrategy.

[2022-11-21 16:40:23,663] WARN The configuration 'value.converter.subject.name.strategy' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:355)
        value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
[2022-11-21 16:40:23,690] WARN The configuration 'converter.subject.name.strategy' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:355)
[2022-11-21 16:40:23,690] WARN The configuration 'value.subject.name.strategy' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:355)
[2022-11-21 16:40:23,690] WARN The configuration 'value.converter.subject.name.strategy' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:355)
[2022-11-21 16:40:23,719] WARN The configuration 'converter.subject.name.strategy' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:355)

I put all of these variations in worker.properties and feed it to connector_distributed to start.

grep -i "name.strategy" /plugins/worker.properties 
value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
consumer.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
consumer.value.converter.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

Solution

  • Those logs can be ignored. Consumer properties don't use those, only the config within the serializer does. That will be printed separately (where you may be seeing the default applied).

    There's an open JIRA to silence the logs from passing converter properties all over the consumer.

    To configure the serializer, you use converters. To configure converters you need to use

    value.converter.[property]=[value] 
    

    So, like schema.registry.url,

    value.converter.value.subject.name.strategy=OtherStrategy