I want to pause the Kafka Listener.
I expected that consumer would be paused but message is still being read from Kafka topic. How can i pause the consumer? I am using 2.5.0 (Spring Boot Parent)
@KafkaListener(id="foo" ,topics = "mytopic-3", concurrency = "3", groupId = "mytopic-1-groupid")
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@GetMapping("/pause")
public void pause( )
{
System.out.println(" Pausing Kafka Listener");
kafkaListenerEndpointRegistry.getListenerContainer("foo").pause();
}
@Configuration
@EnableKafka
public class KafkaConsumerConfig implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
There is a bug in all 2.7.x versions. Fixed today where the container is immediately resumed after pausing.
Boot 2.5 (currently 2.5.2) pulls in 2.7.x; next week's 2.5.3 release should have the fix (spring-kafka 2.7.4).
The fix is in a snapshot 2.7.4-SNAPSHOT which you can get by adding the https://repo.spring.io/snapshot to your build config.
2.7.4
should be available on Monday (and boot 2.5.3 later in the week).
You could also tenporarily downgrade to 2.6.9 (spring-kafka).