Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafka

How to configure kafka retry stop after 1 minutes without maxattempts?


I am using Spring Kafka consumer which fetches messages from a topic and persist them into a db. If a failure condition is met , say for instance the db is unavailable , does kafka consumer library provide mechanism to retry ? If it does , is there a way to set retry during 1 minute after that drop message to dlt(don't want to use interval time and maxAttempt)

this is my consumer configuration

@Configuration 
@RequiredArgsConstructor
public class ConsumerConfiguration {
private final KafkaTemplate <String,TransactionEvent> kafkaTemplate;

public Map<String, Object> consumerConfig() {
    Map<String, Object> consumerConfig = new HashMap<>();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
    return consumerConfig;
}

@Bean
public ConsumerFactory<String, TransactionEvent> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(),
            new ErrorHandlingDeserializer<>(new JsonDeserializer<>(TransactionEvent.class)));
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TransactionEvent>> kafkaListenerContainerFactory(
        ConsumerFactory<String, TransactionEvent> consumerFactory, DefaultErrorHandler errorHandler
) {
    ConcurrentKafkaListenerContainerFactory<String, TransactionEvent> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    listenerContainerFactory.setConsumerFactory(consumerFactory);
    listenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    listenerContainerFactory.setCommonErrorHandler(errorHandler);
    return listenerContainerFactory;
}

@Bean
public DefaultErrorHandler errorHandler() {
    return new DefaultErrorHandler(recover(), new FixedBackOff(2000,5));
}

@Bean
public DeadLetterPublishingRecoverer recover() {
    final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
            CUSTOMIZE_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic().concat("DLT"),cr.partition());

    return new
            DeadLetterPublishingRecoverer(kafkaTemplate, CUSTOMIZE_DESTINATION_RESOLVER);
}}

and this is consumer where i save message

@Component
@RequiredArgsConstructor
public class SmsConsumer {

   private final TransactionEventRepo transactionEventRepo;

    @KafkaListener(groupId = "group-1", topics = ProducerConfiguration.TOPIC_NAME)
    public void consumeSms(@Payload TransactionEvent transactionEvent) {

        transactionEventRepo.save(transactionEvent);
    }

}

Everything is works fine you see i use new FixedBackOff(2000,5),but i want kafka does retry 1 minutes without interval and maxAttempts if exception happens.If exception still occur after 1 minutes it drops message to DLT.


Solution

  • Use an ExponentialBackOff with...

    /**
     * The maximum elapsed time in milliseconds after which a call to
     * {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}.
     */
    public void setMaxElapsedTime(long maxElapsedTime) {