Search code examples
spring-bootspring-kafka

Stop consume message for Stream listener


I am looking for a way to stop consume messages with stream listener.

@StreamListener(MBinding.M_INPUT)
    public void consumeMessage(Message<MerchantEvent> message) {
    //handle when receive message
 }

cloud:
        stream:
            bindings:
                MInput:
                    destination: topicName
                    group: groupName

I have googled it but right now still have no idea how to stop consuming. Is there anyone who know it?


Solution

  • You can do it using the actuator (see Binding Visualization and Control). Or you can invoke the endpoint programmatically.

    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class So58795176Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So58795176Application.class, args);
        }
    
        @StreamListener(Sink.INPUT)
        public void listen(String in) {
            System.out.println();
        }
    
        @Autowired
        BindingsEndpoint endpoint;
    
        @Bean
        public ApplicationRunner runner() {
            return args -> {
                System.in.read();
                endpoint.changeState("input", State.STOPPED);
                System.in.read();
                endpoint.changeState("input", State.STARTED);
            };
        }
    
    }