Search code examples
spring-kafkaspring-cloud-streamspring-cloud-function

Spring Cloud Kafka Streams Dynamic Message Conversion based on Header info


I'm trying to use Spring Cloud Kafka Streams to process messages off of a Kafka Topic that contains different types of messages. For instance, we receive a JSON message from the topic which can be either Type A or Type B message. The producer adds message type in the header, is there a way to Read that header info within the Functional Binder and convert the message accordingly? Or also is there a "Choice" option for branching as messages come in, to route the message to the right convertor?


Solution

  • If you configure the binding to use nativeDecoding, the deserialzation is done by Kafka (via the value.deserializer consumer property).

    spring-kafka provides a JsonDeserializer which looks for type information in specific headers (set by a corresponding JsonSerializer.

    It also provides a DelegatingDeserializer which allows you to select which deserializer to use based on the value in a spring.kafka.serialization.selector header.

    See the Spring for Apache Kafka Reference Manual for more information.