Search code examples
apache-kafkakafka-consumer-apikafka-producer-apispring-kafka

Spring kafka consumer don't commit to kafka server after leader changed


I am using spring-kafka 2.1.10.RELEASE. I have a consumer with next properties (copied almost all of them):

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka1.local:9093, kafka2.local:9093, kafka3.local:9093]
    check.crcs = true
    client.id = kafkaListener-0
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = kafkaLisneterContainer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    max.poll.interval.ms = 300000
    max.poll.records = 50
    metadata.max.age.ms = 300000
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

Apache Kafka version on my production is 2.11-1.0.0-0pan4. There is a cluster with 3 nodes of kafka inside:

enter image description here

Faced a serious problem and cannot even reproduce it locally. And this is what happened:

  1. I started my application with both kafka Producer and Consumer inside.

  2. Everything worked fine untill leader node for my topic wasn't changed at 2019-01-17 06:47:39:

2019-01-17/controller.2019-01-17-03.aaa-aa3.gz:2019-01-17 06:47:39,365 +0000 [controller-event-thread] [kafka.controller.KafkaController] INFO [Controller id=3] New leader and ISR for partition topic_name-0 is {"leader":1,"leader_epoch":3,"isr":[1,3]} (kafka.controller.KafkaController)

  1. After that my consumer stopped commiting offsets to Kafka. Last commit took place same hour and same minute when the leader was changed - 17th January 2019 06:47. enter image description here

4) MOST MYSTERIOUS: in application everything kind-a works OK. Spring-consumer reads new messages and sends them to kafka. I see such logs. Seems like spring consumer saves its offset in memory and sends commit to remote kafka (no errors and etc.):

2019-01-23 14:03:20,975 +0000 [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId=kafkaListener-0, groupId=kafkaLisneterContainer] Fetch READ_UNCOMMITTED at offset 164871 for partition aaa-1 returned fetch data (error=NONE, highWaterMark=164871, lastStableOffset = -1, logStartOffset = 116738, abortedTransactions = null, recordsSizeInBytes=0) 2019-01-23 14:03:20,975 +0000
[externalbetting] [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId=kafkaListener-0, groupId=kafkaLisneterContainer] Added READ_UNCOMMITTED fetch request for partition eaaa-1 at offset 164871 to node aaa-aa1.local:9093 (id: 1 rack: null) 2019-01-23 14:03:20,975

5) But anyway Lag in Apache kafka grows. And if I restart my application, spring bean consumer will be re-created and will loose its in-memory saved offset. It will read that Lag from kafka and process that records for second time.

Please, help to find the key!


Solution

  • When you enable auto commit (Kafka's default), the commits are completely managed by the kafka-clients and Spring has no control over it.

    Setting it to false will allow the listener container to commit the offsets which it will do after each batch of records (poll result) by default or after every record if you set the container AckMode property to RECORD.

    The container will also reliably commit any pending offsets when partitions are revoked due to a rebalance.

    I generally recommend not using auto commit.