Using SpringBoot 3.0.6
I was coding an example "SpringBoot Kafka" poison-pill example to make sure messages get put onto a DLT. I have 2 scenarios:
Note: In this example test, I am sending in JSON which will not contain Header information about the Type.
application.properties
management.endpoints.include=health
spring.application.name=sb-kafka-consumer
spring.kafka.bootstrap-servers=<bootstrap address>
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.client-id=${spring.application.name}
spring.kafka.consumer.group-id=${spring.application.name}-group
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.dto.Order
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.dto
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.topic.name.my-topic=my-topic
spring.kafka.topic.name.my-topic-dlt=my-topic.dlt
Config.java
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;
import com.example.dto.Order;
import java.util.Map;
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class Config {
private final KafkaProperties kafkaProperties;
@Value("${spring.kafka.topic.name.my-topic-dlt}")
private String dlt;
@Bean
public DefaultErrorHandler defaultErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0L, 0L));
}
@Bean
public DeadLetterPublishingRecoverer publisher() {
KafkaTemplate<String, Object> bytesKafkaTemplate = new KafkaTemplate<>(producerFactory());
return new DeadLetterPublishingRecoverer(
bytesKafkaTemplate,
(record, ex) -> new TopicPartition(dlt, record.partition()));
}
private ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(),
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Order.class, new JsonSerializer<>())));
}
private ConsumerFactory<String, byte[]> bytesArrayConsumerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new ByteArrayDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> bytesArrayListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(bytesArrayConsumerFactory());
return factory;
}
}
Comsumer.java - note the business logic exception that can be thrown
@Slf4j
@Service
public class Consumer {
@KafkaListener(
topics="${spring.kafka.topic.name.my-topic}",
groupId="${spring.kafka.consumer.group-id}",
clientIdPrefix="first"
)
public void listen(@Payload Order order) {
log.info("Consumed order: {} with price {}", order.getName(), order.getPrice());
if(order.getPrice() == 0) {
log.info("##### OH DEAR !! Price should never be 0 !!!");
throw new RuntimeException("##### Price should never be 0 !!!");
}
}
}
DLConsumer.java
@Slf4j
@Service
public class DltConsumer {
@KafkaListener(
topics="${spring.kafka.topic.name.my-topic-dlt}",
groupId="${spring.kafka.consumer.group-id}",
clientIdPrefix="second",
containerFactory = "bytesArrayListenerContainerFactory")
public void recoverDLT(@Payload ConsumerRecord<String, byte[]> consumerRecord) {
log.info("Poison pill value: {}", new String(consumerRecord.value()));
}
}
When trying to get the code to work in these 2 examples, I ended up having to use the DelegatingByTypeSerializer, as either a byte[] is received (when it fails to deserialize the received message), or an Order.class (when deserialization was successful but it fails during the handling of the message).
I still have a couple of niggling issues that I can't seem to resolve:
This is my test:
.\kafka-console-producer.bat --broker-list $KAFKA_ENDPOINT --topic my-topic
>{ "name": "monitor", "price": "xy" }
>{ "name": "monitor", "price": 0 }
>{ "name": "monitor", "missingprice": 12 }
This is the output
DltConsumer : Poison pill value: { "name": "monitor", "price": "xy" }
Consumer : Consumed order: monitor with price 0
Consumer : ##### OH DEAR !! Price should never be 0 !!!
DltConsumer : Poison pill value: {"name":"monitor","price":0}
Consumer : Consumed order: monitor with price 0
Consumer : ##### OH DEAR !! Price should never be 0 !!!
DltConsumer : Poison pill value: {"name":"monitor","price":0}
Unlike the CommonLoggingErrorHandler, this solution does NOT seem to log the exception that caused the issue? From the above test data, I would expect to see a JacksonDeserializationException when it tries to map "xy" into an Integer for the price. Also I would expect to see a RuntimeException when the received price is 0 (business logic). Note: I do see those eceptions when I use the CommonLoggingErrorHandler instead of the DefaultErrorHandler
The Deserialization did not fail when the "price" field was not present, but as that default to 0, the code then failed the Business Logic. In this example can I get Kafka to fail JSONDeserialization when fields are missing?
Because of the problem in (2) the message sent to the DLQ was not a true representation of the received message, ie it had gone through internal deserialization. Is there anyway to put the original message on the DLQ ?
Use of a delegating serializer is correct for this use case.
If you want to log the exception as well as sending to a DLT, simply sub class the DeadLetterPublishingRecoverer
; override the accept()
method; log the exception and call super.accept()
.
See this answer Jackson - Required property? - you can configure jackson to detect a missing field, but not to validate the value; you can use validation for that instead.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-validation
One solution would be to subclass the JsonDeserializer
, call super.deserialize()
and then perform the validation after deserialization, then throw an exception if it fails validation.