Search code examples
javaapache-kafkakafka-consumer-apispring-kafka

spring kafka offset increment even auto commit offset is set to false


I am trying to implement manual offset commit for the messages received on kafka. I have set the offset commit to false, but the offset value keeps on increasing.

Not sure what is the reason. Need help resolving the issue.

Below is the code

application.yml

spring:
  application:
    name: kafka-consumer-sample
  resources:
    cache:
      period: 60m

kafka:
      bootstrapServers: localhost:9092
      options:
        enable:
          auto:
            commit: false

KafkaConfig.java

@Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return new DefaultKafkaConsumerFactory<>(config);
    }

 @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

KafkaConsumer.java

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
    public void consume(ConsumerRecord<String, String> record) {

        System.out.println("Consumed Kafka Record: " + record);
        record.timestampType();
        System.out.println("record.timestamp() = " + record.timestamp());
        System.out.println("***********************************");
        System.out.println(record.timestamp());
        System.out.println("record.key() = " + record.key());
        System.out.println("Consumed String Message : " + record.value());
    }
}

output is as follows

Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11

Properties are as follows.

auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000

This is after I restart the consumer. I expected the earlier data to be printed as well.

Is my Understanding correct? Please Note I am restarting my springboot app expecting the messages to start from first. and my kafka server and zookeeper are not terminated.


Solution

  • If theauto acknowledgement is disabled by using this property ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Then you have to set the acknowledgement mode on container level to MANUAL and don't commit the offset because by default it is set to BATCH.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        return factory;
    }
    

    Because when auto acknowledgement is disabled container level acknowledgement is set to BATCH

    public void setAckMode(ContainerProperties.AckMode ackMode)

    Set the ack mode to use when auto ack (in the configuration properties) is false.

    1. RECORD: Ack after each record has been passed to the listener.
    2. BATCH: Ack after each batch of records received from the consumer has been passed to the listener
    3. TIME: Ack after this number of milliseconds; (should be greater than #setPollTimeout(long) pollTimeout.
    4. COUNT: Ack after at least this number of records have been received
    5. MANUAL: Listener is responsible for acking - use a AcknowledgingMessageListener.

    Parameters:

    ackMode - the ContainerProperties.AckMode; default BATCH.

    Committing Offsets

    Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list). The default AckMode is BATCH. Starting with version 2.3, the framework sets enable.auto.commit to false unless explicitly set in the configuration. Previously, the Kafka default (true) was used if the property was not set.

    And if you want to read from the beginning always you have to set this property auto.offset.reset to earliest

    config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");
    

    Note : Make sure groupId must be the new one which does not have any offset in kafka