Search code examples
javaapache-kafkaspring-kafka

Kafka DefaultErrorHandler Seek to current after exception


In my spring boot consumer using kafka I have configured DefaultErrorHandler to retry failed event a couple of times.

Now, the issue is that after each retry I am seeing a stackTrace of the exception which ideally should not be logged (or if anything I am missing please point out).

My factory code :

    public <K, V> ConcurrentKafkaListenerContainerFactory<K, V> createDlqContainerFactory(
        ConsumerFactory<K, V> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    FixedBackOff fixedBackOff = new FixedBackOff(dlqRetryInterval, dlqMaxAttempts);
    ConsumerRecordRecoverer recovery = (record, ex) -> {
        log.error("Final retry failed for DLQ record: topic={}, partition={}, offset={}",
                record.topic(), record.partition(), record.offset(), ex);
    };
    factory.setCommonErrorHandler(new DefaultErrorHandler(recovery, fixedBackOff));
    return factory;
}

Ideally I would want only the Final retry error log, but instead I am getting a stacktrace after each retry

    2024-12-25 11:31:57,003 ERROR[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] , KafkaMessageListenerContainer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:227)
    at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713)
    at io.micrometer.observation.Observation.observe(Observation.java:565)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.transaction.processing.service.consumer.StockEventConsumer.consume(com.transaction.processing.service.model.eventmetadata.EventMetaData)' threw exception
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701)
    ... 10 common frames omitted
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:435)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800)
Caused by: java.lang.RuntimeException: My Exception Message

spring-kafka: 3.2.4

Consumer config:

    @Bean
public ConsumerFactory<String, EventMetaData> stockConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EventMetaData.class));
}

@Bean(name = "commonKafkaTemplate")
public KafkaTemplate<String, EventMetaData> kafkaTemplate(
        @Value("${kafka.common.brokers}") String brokers) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    ProducerFactory<String, EventMetaData> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
    return new KafkaTemplate<>(producerFactory);
}

Is there any solution so that I am able to handle this


Solution

  • Looks like your concern has been mitigated in the latest Spring for Apache Kafka 3.3.0: https://github.com/spring-projects/spring-kafka/issues/3409.

    So, there is now a new RecordInRetryException and no extra logs in between retries as you would expect.

    Since this is a breaking change in the behavior we did not back-port the fix into 3.2.x.