I need to send a message to a specific Kafka topic. I use the following KafkaTemplate to do this: KafkaTemplate<String, RequestDto>
The following parameters are put in the Kafka producer:
private ProducerFactory<String, RequestDto> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
Creating a Producer:
public KafkaTemplate<String, RequestDto> kafkaTemplate() {
KafkaTemplate<String, RequestDto> template = new KafkaTemplate<>(producerConfigs());
template.setMessageConverter(new StringJsonMessageConverter());
return template;
}
And when I execute the "send" method, I have a message sent to the Kafka topic, but at the same time I send the header, which contains the path to the DTO file of the request.
ListenableFuture<SendResult<String, RequestDto>> result = kafkaTemplate.send(topic, requestDto);
Example header in offset explorer
Because of this header something breaks in the application, which is a consumer and I can't do anything about it. Is there any way to remove this header from the query?
You can create DefaultKafkaProducerFactory by adding JsonSerializer with ADD_TYPE_INFO_HEADERS
false
JsonSerializer jsonSerializer = new JsonSerializer();
jsonSerializer.setAddTypeInfo(false); //
Set to false to disable adding type info headers.
And then create DefaultKafkaProducerFactory
with key and value serializer
return new DefaultKafkaProducerFactory<>(props,new StringSerializer(),jsonSerializer);
You can also disable this properties through producer side by Configuration Properties
JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
And to ignore during consumer side
JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.