Search code examples
apache-kafkaavroapache-kafka-streamsspring-cloud-streamconfluent-schema-registry

How can we configure value.subject.name.strategy for schemas in Spring Cloud Stream Kafka producers, consumers and KStreams?


I would like to customize the naming strategy of the Avro schema subjects in Spring Cloud Stream Producers, Consumers and KStreams.

This would be done in Kafka with the properties key.subject.name.strategy and value.subject.name.strategy -> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy

In a native Kafka Producer this works:


private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }

    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }

However I cannot find how to do this in Spring Cloud Stream. So far I have tried this in a producer:

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Apparently Spring Cloud uses it's own subject naming strategy with the interface org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy and only one subclass: DefaultSubjectNamingStrategy.

Is there declarative way of configuring value.subject.name.strategy or are we expected to provide our own org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy implementation and the property spring.cloud.stream.schema.avro.subject-naming-strategy?


Solution

  • As pointed out in the other answer there's a dedicated property, spring.cloud.stream.schema.avro.subjectNamingStrategy, that allows to set up a different naming strategy for Kafka producers.

    I contributed the org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy that provides that functionality out of the box.

    In the case of Kafka Streams and native serialization/deserialization (default behaviour from Spring Cloud Streams 3.0.0+) you have to use Confluent's implementation (io.confluent.kafka.serializers.subject.RecordNameStrategy) and the native properties:

    spring:
      application:
        name: shipping-service
      cloud:
        stream:
          ...
          kafka:
            streams:
              binder:
                configuration:
                  application:
                    id: shipping-service
                  ...
                  value:
                    subject:
                      name:
                        strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy