I have a simple Spring Cloud Stream project using Spring Integration DSL flows and using the Kafka binder. Everything works great, but message header values coming from Kafka arrive as byte[]
.
This means that my SI @Header
parameters need to be of type byte[]
. Which works, but it'd be nice to have them as Strings (all the inbound headers I care about are String values).
I've configured the Kafka clients to use StringSerializer/StringDeserializer. I assume I also need to somehow tell Spring Kafka which headers to map as Strings and what character encoding to use.
I'm obviously missing something here. Any tips?
Set the binder property headerMapperBeanName
to the bean name of a DefaultKafkaHeaderMapper
bean.
spring.cloud.stream.kafka.binder.headerMapperBeanName
The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a
DefaultKafkaHeaderMapper
that uses JSON deserialization for the headers.
You can then specify which headers you want to be mapped as Strings by the mapper:
/**
* Set the headers to not perform any conversion on (except {@code String} to
* {@code byte[]} for outbound). Inbound headers that match will be mapped as
* {@code byte[]} unless the corresponding boolean in the map value is true,
* in which case it will be mapped as a String.
* @param rawMappedHeaders the header names to not convert and
* @since 2.2.5
* @see #setCharset(Charset)
* @see #setMapAllStringsOut(boolean)
*/
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {