Search code examples
javaapache-kafkalistenerconsumer

Kafka sends me messages already processed


I have a Java Service that reads messages from Kafka. The service is very simple. I have a listener:

@KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.topic.group}", containerFactory = "mesagueKafkaListenerContainerFactory")

Then, I have this conf:

  @Autowired
  private KafkaProperty kafkaProperty;

  private ConsumerFactory<String, KafkaMessage> mesagueConsumerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.kafkaProperty.getGroupIdTest());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>(KafkaMessage.class));
  }

  private ConsumerFactory<String, String> consumerFactory(String groupId) {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
    return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new JsonDeserializer<>(KafkaMessage.class));
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> mesagueKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(mesagueConsumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setErrorHandler(new KafkaErrorHandler());
    return factory;
  }

And then:

kafka:
  topic:
      name: name
      group: 1
  bootstrap:
    servers: xxx
  autoCommit: false

When I send a message to the queue, the services proceses it OK.

But when I restart the service, it read again all the messages from the queue (messages already procesed too)

I only want to process the new messages that are not procesed.

Thanks in advance.


Solution

  • You are setting the auto.commit to false and you are not committing the offsets manually. It means that the consumer will not tell Kafka "hey, I've finished consuming these messages". So, next time you restart the consumer, it will start off from the last committed message, which in your case is probably the first message in the topic. If you want to keep the auto.commit to false then you need to acknowledge when you receive a new message.

    @KafkaListener(....)
    public void consume(ConsumerRecord<?, ?> consumerRecord,  
            Acknowledgment acknowledgment) {
    
        // Consume record
    
        acknowledgment.acknowledge();
    }