Search code examples
spring-kafkaspring-cloud-stream

Listing all my kafka consumers informations


I want to list all my kafka consumers and inspect their state, group ...

Before I was using just spring-kafka so I did the following and it works

private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

var listenerContainers = kafkaListenerEndpointRegistry.getAllListenerContainers();

    listenerContainers.forEach(mlc -> {

        var isRunning = mlc.isRunning();
        var group = mlc.getGroupId()
        // other checks

But now when I am using spring-cloud-stream with kafka, listenerContainers is an empty list !

How can I do the same with SCS ?


Solution

  • With Spring Cloud Stream, add a ListenerContainerCustomizer bean and capture each listener container (e.g. store them in a list).

    List<AbstractMessageListenerContainer<?, ?>> containers = new ArrayList<>();
    
    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
        return (container, dest, group) -> this.containers.add(container);
    }