Search code examples
apache-kafkakafka-consumer-apispring-kafkakafka-producer-api

linger.ms not working as expected in Spring Kafka


I've a use case where I need to do batch processing using Kafka. Suppose if there are some 100 requests coming in 1 Minute, instead of publishing each request immediately, I wanted to batch all the 100 requests and publish it to topic once.

But with the below configuration, batch processing is not happening, as soon as a message is sent it is getting published to topic and simultaneously received in consumer

ProducerConfig

public class KafkaProducerConfig {

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.ACKS_CONFIG, 1);
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 60000);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 100000);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

}

ConsumerConfig

public class KafkaConfig {

  ConsumerFactory<String, String> kafkaConsumerFactory(Boolean autoCommit) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    return new DefaultKafkaConsumerFactory<>(props);
  }



  @Bean("kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory(Boolean.TRUE));
    factory.setBatchListener(true);
    factory.setConcurrency(1);
    return factory;
  }
  
}

Here I've set linger.ms = 60000 and as per my understanding if linger.ms is set to some value, then producer will wait at least for that amount of time even if sender thread became available earlier and batch size is not reached.

But here in my case as soon as a message is sent it is getting published to topic without waiting for 60000 ms or for batch size to reach the value which is being set

Producer

  @Autowired
  private KafkaTemplate<String, String> kafka;

  kafka.send("batch-test", message);

Consumer

  @KafkaListener(id = "testGroup", topics = {"batch-test"})
  public void test(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    for (int i = 0; i < messages.size(); i++) {
      System.out.println(messages.get(i) + partitions.get(i) + "-" + offsets.get(i) + "");
    }
  }

Small Update here I checked the Spring kakfa logs and found that ProducerConfig values are not being set properly, they are falling back to default values

roducerConfig values: 
acks = -1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0

Why is linger.ms and batch.size is falling back to default values?


Solution

  • Found solution for this, linger.ms was not causing any effect because the value was not being set from the code (still don't know why). Hence the linger.ms has its default value as 0 because of which batching was not happening.

    Later I set the value for both linger.ms and batch.size in the application.properties then it worked.

    spring.kafka.producer.batch-size=1000000
    spring.kafka.producer.properties.linger.ms=10000