we are using spring-kafka, and for non-spring apps that communicate with us that don't set the spring_json_header_types header, we are specifying that certain headers should be mapped in as Strings by adding them as rawMappedHeaders. We do this for all of our binders by setting the config: spring.cloud.stream.kafka.binder.headerMapperBeanName as the defaultKafkaHeaderMapper which we've set rawMappedHeaders for. This applies to both the producer and consumer bindings we have.
However, when we send data outbound to some of our spring applications, we actually don't want to leave out the spring_json_header_types values for those rawMappedHeader fields, since the downstream apps have not adjusted to this non-spring app communication yet.
Is there a way through the config or code that I can apply a headerMapperBeanName to only all of the producers or only to all of the consumers so that I can have them map headers differently for outbound vs inbound? Or is there a better way of doing this that the DefaultKafkaHeaderMapper itself can handle?
There is only one header mapper at the binder level.
It is probably easiest to create a custom KafkaHeaderMapper
implementation that delegates to a different DefaultKafkaHeaderMapper
for inbound and outbound mappings.
public interface KafkaHeaderMapper {
/**
* Map from the given {@link MessageHeaders} to the specified target headers.
* @param headers the abstracted MessageHeaders.
* @param target the native target headers.
*/
void fromHeaders(MessageHeaders headers, Headers target);
/**
* Map from the given native headers to a map of headers for the eventual
* {@link MessageHeaders}.
* @param source the native headers.
* @param target the target headers.
*/
void toHeaders(Headers source, Map<String, Object> target);
}