Search code examples
springapache-kafkaspring-cloud-stream

How to read Kafka Message Key from Spring cloud streams?


I am using spring cloud streams to consume a message from Kafka.

Is it possible to read the Kafka Message Key from the code?

I have a Kafka topic that generally has 2 types of messages. The action to be taken varies depending on the message key. I see the spring documentation has only the following to read the message. Here, I need to specify the actual mapping of the message (Greetings class here). However, I need a way through which I can read the message key and determine the deserializable Pojo

public class GreetingsListener {

    @StreamListener(GreetingsProcessor.INPUT)
    public void handleGreetings(@Payload Greetings request) {
     
    }
}

Solution

  • You can try something like this:

    @StreamListener(GreetingsProcessor.INPUT)
    public void handleGreetings(@Payload Greetings request, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)String  key) {
    
    }
    

    You need to provide a proper deserializer for the key. For e.g. if your key is String, then you can provide:

    spring.cloud.stream.kafka.binder.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    If there is a need to use different key deserializer for different input channels, this setting can be extended under producer section of each kafka bindings. For example:

    spring:
      cloud:
        stream:
          kafka:
            bindings:
              <channel_name>:
                consumer:
                  startOffset: latest
                  autoCommitOffset: true
                  autoCommitOnError: true
                  configuration:
                    key.deserializer: org.apache.kafka.common.serialization.StringDeserializer