I am writing a kafka consumer.I have set the Acknowledged property to manual. So whenever consumer failed to process the message I don't do ack. Now I want the consumer to process this failed message immediately, but it doesn't.
This is my consumer config class looks like.
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "${crmdsforecast.judjement-consumer-groupId}");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest" );
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
System.out.println("Error while processing the record {}"+ exception.getCause().getMessage());
}, new FixedBackOff(3000L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
My Consumer method looks like this.
@KafkaListener( containerFactory = "kafkaListenerContainerFactory",id = "${id}", topics = "${topicname}")
public void consume(String message,Acknowledgment acknowledgment){
Dto payload = new Dto();
try{
payload = payloadDeserializer.convertIntoDtoObject(message);
if(payload != null)
//Do Something;
acknowledgment.acknowledge();
}
catch(JsonProcessingException e){
log.error("Error occured while Deserializing the String input {}",message);
acknowledgment.acknowledge();
}
catch(Exception e){
log.error("Some error occured while updating revenueLines {}",e.getMessage());
//Here I expect that if error comes consumer should reread the message.
}
}
I tried multiple solution including SeekToCurrentBatchErrorHandler
. I changed the containerFactory method to below.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
errorHandler.setBackOff( new ExponentialBackOff(500L, 2L));
factory.setBatchErrorHandler(errorHandler);
factory.setBatchListener(true);
factory.setConsumerFactory(consumerFactory());
return factory;
}
I also tried using DefaultErrorHandler
but creating separate bean and setting up commonErrorHandler but it didn't work.
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(1000, 3);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
}, fixedBackOff);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
Can anyone helps with this ?
Kafka maintains 2 pointers, committed offset and current position; they are unrelated, except that the position is set to the committed offset when a consumer first starts.
Not acknowledging a record will not cause it to be redelivered because the position doesn't change.
You are catching the exceptions.
You need to throw the exception to the container; the default error handler will perform a seek to reposition the partition and redeliver the record.
See the documentation for Spring for Apache Kafka.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling