Search code examples
javaapache-kafkaspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

How to set offset commit interval for Spring Cloud Stream Kafka via properties


I would like to set an automatic offset commit interval on Kafka consumer for Spring Cloud Stream via properties.

As I see from metrics, by default Spring Cloud Stream Kafka commits offset on each consumed message. It becomes dramatic and increases load on Kafka broker for highly loaded topics (e.g. if traffic is 10K messages per second).

We declare consumers in the following way:

@Bean
public Consumer<TestEvent> testEvents() {
    …
}

I tried few options, but nothing helped me.

Option #1

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          consumer-properties:
            auto.offset.reset: latest
            auto.commit.interval.ms: 2000
            enable.auto.commit: true
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2

With such config, messages consumed correctly, but health check has status as down:

{
  "status": "DOWN",
  "components": {
    "binders": {
      "status": "DOWN",
      "components": {
        "kafka": {
          "status": "DOWN",
          "details": {
            "error": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer"
          }
        }
      }
    }, …

As I understand, enable.auto.commit: true is not recommended for Spring Cloud Stream.

Option #2

spring:
  cloud:
    function:
      definition: testEvents
    stream:
      kafka:
        binder:
          brokers: xxx
          configuration:
            auto.offset.reset: latest
      bindings:
        testEvents-in-0:
          destination: test_topic
          group: ${spring.application.name}_test_topic
          consumer:
            concurrency: 2
            ack-mode: TIME
            ack-time: 2000

And still, such config property ack-mode: TIME didn't help, as we see offset commit on each consumed message.

I use maven dependency spring-cloud-starter-stream-kafka version 3.0.12.RELEASE


Solution

  • First of all, ack-mode is a kafka-specific consumer binding property, you have it in the common properties.

    spring:
      cloud:
        function:
          definition: testEvents
        stream:
          kafka:
            binder:
              brokers: localhost:9092
              configuration:
                auto.offset.reset: latest
            bindings:
              testEvents-in-0:
                consumer:
                  ack-mode: TIME
          bindings:
            testEvents-in-0:
              destination: test_topic
              group: ${spring.application.name}_test_topic
              consumer:
                concurrency: 2
    

    Second, there is no ack-time property, you have to set it on the container via a container customizer:

    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
        return (container, dest, group) -> {
            container.getContainerProperties().setAckTime(2000L);
            container.getContainerProperties().setLogContainerConfig(true);
        };
    }