Search code examples
spring-bootkotlinapache-kafkaspring-kafka

Spring Kafka consumer listener configuration


My team and I are implementing Spring Kafka within our codebase, so to achieve all the benefits of an event-driven ecosystem.

The Spring Kafka configs are as follows:

...
spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: >
        org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          json.trusted.packages: "{trusted packages}"
          deserializer.value.delegate.class: >-
                  org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    properties:
      security:
        protocol: SASL_SSL
      sasl:
        jaas:
          config: >
            ...
        mechanism: PLAIN
      basic:
        auth:
          ...
      schema:
        ...
      bootstrap:
        ...
  ...

app:
  kafka:
    event-type-one:
      topic: >-
        {topic}
      consumer-group-id: {group id}
      default-type: {default type event}
    event-type-two:
      topic: >-
        {topic}

And the Kafka Consumer is configured likewise:

    @KafkaListener(
        topics = ["\${app.kafka.path.topic}"],
        groupId = "\${app.kafka.path.group-id}",
        properties = ["spring.json.value.default.type=\${app.kafka.event-type-one.default-type}"]
    )
    fun fooBar(record: ConsumerRecord<String, Any>) = runBlocking {

Please notice that we've provided a default type for our Consumer, because we are not specifying any headers in the incoming events, as per the docs [1].

The question is: when having a Spring Kafka configuration that enables the use of a Kafka Consumer, is providing a default type the correct approach in order to solve the "no headers provided" deserialisation error upon the Consumer trying to deserialise an event? The default type is needed to be provided, according to the docs [1], if no headers are present in the event that's being produced and consumed via our @KafkaListener config.

Is this the right way to implement a Kafka Listener via the Spring Kafka [2] library?

Our goal is to potentially have more Consumers, so would that mean that for each Consumer (as it would have it's own event type) we need to provide a default type?

[1] https://docs.spring.io/spring-kafka/reference/html/#serdes-json-config

[2] https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka


Solution

  • If your method signature is like that record: ConsumerRecord<String, Any> and you really cannot set a proper expected type via that parameter, then really only the way is to set that spring.json.value.default.type property. You also may need to look into a spring.json.use.type.headers property to be set to false. This way the JsonDeserializer won't look into headers for type at all, and will fallback to that provided default type. Do you have any problem with that kind of configuration?