Search code examples
error-handlingapache-kafkadeserializationquarkussmallrye

Quarkus Kafka Deserializing Exception to Dead Letter Queue


In order to satisfy reliability of my service, I need to push all incoming messages, that could not be deserialized, into a dead-letter topic using kafka-smallrye and quarkus.

All the messages on the topic should be in avro format (but I could not be sure) with a define schema on a schema-registry.

I have set the configuration of my consumer in this way:

mp:
  messaging:
    incoming:
      test-in:
        connector: smallrye-kafka
        group:
          id: test-in-consumer-group
        topic: events-topic
        failure-strategy: dead-letter-queue
        schema:
          registry:
            url: http://localhost:8081
        value:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        key:
          deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          subject:
            name:
              strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
        specific:
          avro:
            reader: true

My consumer code:

@ApplicationScoped
public class Consumer {

    @Incoming("test-in")
    public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
            String schemaFullName = data.getPayload().getSchema().getFullName();
            System.out.println(schemaFullName);

            // other consumer code
            return data.ack();
    }
}

When the consumer can't deserialize a message, the consuming process is blocked instead move the message to the dead letter and continue. I suppose that a deserialization error not produce an nack so the message could not be moved to the dead-letter.

There is a way to move not deserilizable message to the dead letter topic?


Solution

  • i resolved using DeserializationFailureHandler. You have to use the topic of "dead-letter-queue" as normal topic and send your failure pushing.

    @ApplicationScoped
    @Identifier("failure-dead-letter") // Set the name of the failure handler
    public class MyDeserializationFailureHandler
        implements DeserializationFailureHandler<CustomBean> { // Specify the expected type
    
        private static final Logger LOGGER = Logger.getLogger(MyDeserializationFailureHandler.class);
    
        @Inject
        @Channel("dead-letter")
        Emitter<DeadLetterBean> deadLetterBeanEmitter;
    
        @Override
        public CustomBean handleDeserializationFailure(String topic, boolean isKey, String deserializer, byte[] data,
                                               Exception exception, Headers headers) {
            LOGGER.error("ERROR: " + exception.getMessage());
            deadLetterBeanEmitter.send(Message.of(new DeadLetterBean(topic, isKey, deserializer, data, exception))
                                              .withAck(() -> {
                                                  // Called when the message is acked
                                                  LOGGER.error("SENT TO DEAD LETTER");
                                                  return CompletableFuture.completedFuture(null);
                                              })
    
                                              .withNack(throwable -> {
                                                  // Called when the message is nacked
                                                  LOGGER.error("ERROR, NOT SENT DEAD LETTER");
                                                  return CompletableFuture.completedFuture(null);
                                              }));
            return null;
        }
    }
    

    also register the new topic as publisher,and the deserialization-failure-handler

        mp.messaging.outgoing.dead-letter.topic=dead-letter-topic-name
        mp.messaging.outgoing.dead-letter.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
        mp.messaging.incoming.message-in.value-deserialization-failure-handler=failure-dead-letter