Is it somehow possible to get the key of the kafka message when consuming them as batches?
I managed to access the message key when using Message<String>
as input of my consumer function, but this only works in non-batch-mode:
@SpringBootApplication
class KafkaSink {
private val log = logger()
@Bean
fun sink() : Consumer<Message<String>> {
return Consumer {
log.info("key: ${it.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]} value: ${it.payload}")
}
}
}
When setting the property spring.cloud.stream.binding.sink.consumer.batch-mode=true
I can only use List<String>
as argument for the consumer but not List<Message<String>>
Use Message<List<String>>
; then, the KafkaHeaders.RECEIVED_MESSAGE_KEY
is a List<?>
in the same order as the payloads - same thing for all other headers.
EDIT
Re comment below; the default content-type is application/json
. Adding
bindings:
sink-in-0:
content-type: text/plain
fixes your test.