I have a Java Service that reads messages from Kafka. The service is very simple. I have a listener:
@KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.topic.group}", containerFactory = "mesagueKafkaListenerContainerFactory")
Then, I have this conf:
@Autowired
private KafkaProperty kafkaProperty;
private ConsumerFactory<String, KafkaMessage> mesagueConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.kafkaProperty.getGroupIdTest());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(KafkaMessage.class));
}
private ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new JsonDeserializer<>(KafkaMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> mesagueKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(mesagueConsumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setErrorHandler(new KafkaErrorHandler());
return factory;
}
And then:
kafka:
topic:
name: name
group: 1
bootstrap:
servers: xxx
autoCommit: false
When I send a message to the queue, the services proceses it OK.
But when I restart the service, it read again all the messages from the queue (messages already procesed too)
I only want to process the new messages that are not procesed.
Thanks in advance.
You are setting the auto.commit
to false and you are not committing the offsets manually. It means that the consumer will not tell Kafka "hey, I've finished consuming these messages". So, next time you restart the consumer, it will start off from the last committed message, which in your case is probably the first message in the topic. If you want to keep the auto.commit to false then you need to acknowledge
when you receive a new message.
@KafkaListener(....)
public void consume(ConsumerRecord<?, ?> consumerRecord,
Acknowledgment acknowledgment) {
// Consume record
acknowledgment.acknowledge();
}