Search code examples
javaapache-kafkaspring-kafka

Access Exception thrown during NonBlocking retry in DltHandler


I want to know if it is possible to get hold of the original exception thrown in the @KafkaListener method during DLT Processing, i.e in @DltHandler method.

For example, I have a method like this

@KafkaListener(topics = "message", groupId = "1")
@RetryableTopic(attempts = "3", backoff = @Backoff(value = 3000L))
public void readMessages(@Payload String message) {
     // Some exception is thrown here
}

I have a Dlt handler in the same class like this

@DltHandler
public void processDltMessages(String message) {
    // I want to have a custom logic based on the exception thrown. 
    // How to know the exception thrown in the original method.
}

Solution

  • The original exception object is not available, but information about the exception is available in message headers; either consume Message<String> msg in the DLT handler (and use msg.getHeaders.get(...)) or extract the headers individually, as shown in this question

    @DltHandler
    void handler(String message,
            @Header(KafkaHeaders.ORIGINAL_OFFSET) byte[] offset,
            @Header(KafkaHeaders.EXCEPTION_FQCN) String descException,
            @Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stacktrace,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
        System.out.println(msg);
        System.out.println(ByteBuffer.wrap(offset).getLong());
        System.out.println(descException);
        System.out.println(stacktrace);
        System.out.println(errorMessage);
    }