Search code examples
apache-kafkakafka-consumer-apispring-kafkakafka-producer-api

Not able to Pause Spring Kafka Container


I want to pause the Kafka Listener.

  1. I call http://localhost:8080/pause to the pause the container
  2. I send data on the topic.
  3. Consumer starts consuming the message

I expected that consumer would be paused but message is still being read from Kafka topic. How can i pause the consumer? I am using 2.5.0 (Spring Boot Parent)

     @KafkaListener(id="foo" ,topics = "mytopic-3", concurrency = "3", groupId = "mytopic-1-groupid")
        

@Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
        
          @GetMapping("/pause")
            public void pause(  )
            {
                
                System.out.println(" Pausing Kafka Listener");
                kafkaListenerEndpointRegistry.getListenerContainer("foo").pause();
            }
         
        
          @Configuration
    @EnableKafka
    public class KafkaConsumerConfig implements KafkaListenerConfigurer {
        
        
        @Autowired
        private LocalValidatorFactoryBean validator;
        
       
        @Override
        public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
              registrar.setValidator(this.validator);
            
        }
        

Solution

  • There is a bug in all 2.7.x versions. Fixed today where the container is immediately resumed after pausing.

    Boot 2.5 (currently 2.5.2) pulls in 2.7.x; next week's 2.5.3 release should have the fix (spring-kafka 2.7.4).

    The fix is in a snapshot 2.7.4-SNAPSHOT which you can get by adding the https://repo.spring.io/snapshot to your build config.

    2.7.4 should be available on Monday (and boot 2.5.3 later in the week).

    You could also tenporarily downgrade to 2.6.9 (spring-kafka).