I am using Spring Kafka consumer which fetches messages from a topic and persist them into a db. If a failure condition is met , say for instance the db is unavailable , does kafka consumer library provide mechanism to retry ? If it does , is there a way to set retry during 1 minute after that drop message to dlt(don't want to use interval time and maxAttempt)
this is my consumer configuration
@Configuration
@RequiredArgsConstructor
public class ConsumerConfiguration {
private final KafkaTemplate <String,TransactionEvent> kafkaTemplate;
public Map<String, Object> consumerConfig() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
return consumerConfig;
}
@Bean
public ConsumerFactory<String, TransactionEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(TransactionEvent.class)));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TransactionEvent>> kafkaListenerContainerFactory(
ConsumerFactory<String, TransactionEvent> consumerFactory, DefaultErrorHandler errorHandler
) {
ConcurrentKafkaListenerContainerFactory<String, TransactionEvent> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerContainerFactory.setConsumerFactory(consumerFactory);
listenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
listenerContainerFactory.setCommonErrorHandler(errorHandler);
return listenerContainerFactory;
}
@Bean
public DefaultErrorHandler errorHandler() {
return new DefaultErrorHandler(recover(), new FixedBackOff(2000,5));
}
@Bean
public DeadLetterPublishingRecoverer recover() {
final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
CUSTOMIZE_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic().concat("DLT"),cr.partition());
return new
DeadLetterPublishingRecoverer(kafkaTemplate, CUSTOMIZE_DESTINATION_RESOLVER);
}}
and this is consumer where i save message
@Component
@RequiredArgsConstructor
public class SmsConsumer {
private final TransactionEventRepo transactionEventRepo;
@KafkaListener(groupId = "group-1", topics = ProducerConfiguration.TOPIC_NAME)
public void consumeSms(@Payload TransactionEvent transactionEvent) {
transactionEventRepo.save(transactionEvent);
}
}
Everything is works fine you see i use new FixedBackOff(2000,5),but i want kafka does retry 1 minutes without interval and maxAttempts if exception happens.If exception still occur after 1 minutes it drops message to DLT.
Use an ExponentialBackOff
with...
/**
* The maximum elapsed time in milliseconds after which a call to
* {@link BackOffExecution#nextBackOff()} returns {@link BackOffExecution#STOP}.
*/
public void setMaxElapsedTime(long maxElapsedTime) {