I'm trying to use Spring Cloud Kafka Streams to process messages off of a Kafka Topic that contains different types of messages. For instance, we receive a JSON message from the topic which can be either Type A or Type B message. The producer adds message type in the header, is there a way to Read that header info within the Functional Binder and convert the message accordingly? Or also is there a "Choice" option for branching as messages come in, to route the message to the right convertor?
If you configure the binding to use nativeDecoding
, the deserialzation is done by Kafka (via the value.deserializer
consumer property).
spring-kafka provides a JsonDeserializer
which looks for type information in specific headers (set by a corresponding JsonSerializer
.
It also provides a DelegatingDeserializer
which allows you to select which deserializer to use based on the value in a spring.kafka.serialization.selector
header.
See the Spring for Apache Kafka Reference Manual for more information.