Search code examples
javaspring-bootapache-kafkaspring-kafkaspring-cloud-stream-binder-kafka

Consuming Kafka messages with its key in batches using Spring Cloud Stream Kafka Binder


Is it somehow possible to get the key of the kafka message when consuming them as batches?

I managed to access the message key when using Message<String> as input of my consumer function, but this only works in non-batch-mode:

@SpringBootApplication
class KafkaSink {

    private val log = logger()

    @Bean
    fun sink() : Consumer<Message<String>> {
        return Consumer {
            log.info("key: ${it.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]} value: ${it.payload}")      
        }
    }
}

When setting the property spring.cloud.stream.binding.sink.consumer.batch-mode=true I can only use List<String> as argument for the consumer but not List<Message<String>>


Solution

  • Use Message<List<String>>; then, the KafkaHeaders.RECEIVED_MESSAGE_KEY is a List<?> in the same order as the payloads - same thing for all other headers.

    EDIT

    Re comment below; the default content-type is application/json. Adding

          bindings:
            sink-in-0:
              content-type: text/plain
    

    fixes your test.