Search code examples
spring-bootspring-kafka

Spring kafka consume records with some delay


I'm using spring kafka in my application. I want to add some delay of 15 mins for consuming the records for one of the listener - kafkaRetryListenerContainerFactory. I have two listeners. Below is my configuration:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaRetryListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(retryConsumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);       
    return factory;
}

Kafka retry listener:

@KafkaListener(topics = "${spring.kafka.retry.topic}", groupId = "${spring.kafka.consumer-group-id}", 
        containerFactory = "kafkaRetryListenerContainerFactory", id = "retry.id")
public void retryMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    Thread.sleep(900000);
    LOG.info(String.format("Consumed retry message -> %s", record.toString()));
    acknowledgment.acknowledge();
}

When I added Thread.sleep(), I'm getting continuous rebalancing error in the logs

Attempt to heartbeat failed since group is rebalancing

My spring kafka version is 2.3.4

Below are the config values:

max.poll.interval.ms = 1200000 (this is higher than thread.sleep)

heartbeat.interval.ms = 3000

session.timeout.ms = 10000

I have tried ack.nack(900000); Still getting the rebalancing error

Any help will be appreciated


Solution

  • A filter is not the right approach; you need to Thread.sleep() the thread and make sure that max.poll.interval.ms is larger than the total sleep and processing time for the records received by the poll.

    In 2.3, the container has the option to sleep between polls; with earlier versions, you have to do the sleep yourself.

    EDIT

    I just found this in my server.properties (homebrew on Mac OS):

    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
    

    That explains why we see the partitions initially assigned to the first consumer (see comment below).

    Setting it back to the default 3000 works for me.