I have to set a custom header in the kafka message, my kafka cluster natively supports headers (1.x.x). I am currently using springCloudVersion=Finchley.RELEASE.
When I am setting property
default:
producer:
headerMode: none
None of the headers are coming in the output.
On the other hand, If I am setting
default:
producer:
headerMode: headers
Many headers including contentType
and spring_json_header_types
are coming in the header which greatly affects the throughput. Kafka being a language/framework agnostic message delivery mechanism, I feel, spring should provide a way to only include user-provided headers. Is there any work around to get only user-set headers to the kafka topic while suppressing all the spring cloud related headers.
I already answered this on GitHub. It's a waste of your time and ours to ask the same question in multiple places. As I said there, Stack Overflow is the preferred place to ask questions.
Here's the answer again:
Please use Stack Overflow to ask questions; GitHub issues are for reporting bugs or asking for new features.
You can provide a custom header mapper in the binder configuration.
spring.cloud.stream.kafka.binder.headerMapperBeanName
The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a DefaultKafkaHeaderMapper that uses JSON deserialization for the headers.
Default: none.
Implement your own KafkaHeaderMapper
and add it as a @Bean
and configure the binder to use it.
EDIT
The DefaultKafkaHeaderMapper
has a property to enable converting byte[]
headers to String
.
/**
* Set the headers to not perform any conversion on (except {@code String} to
* {@code byte[]} for outbound). Inbound headers that match will be mapped as
* {@code byte[]} unless the corresponding boolean in the map value is true,
* in which case it will be mapped as a String.
* @param rawMappedHeaders the header names to not convert and
* @since 2.2.5
* @see #setCharset(Charset)
* @see #setMapAllStringsOut(boolean)
*/
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {