Search code examples
javaspring-bootapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Pause message consumption from main kafka stream and start from other kafka topic


I am using @StreamListener (spring cloud stream) to consume messages from a topic (input-channel), do some processing and save into some cache or database.

My requirement is, if DB goes down while processing the consumed message, I want to pause the main consumer(input-channel), and start consuming from another TOPIC (INPUT56-CHANNEL), and as soon as It consume all the message (doesn't have many) from INPUT56-CHANNEL, I want to resume the main consumer (input-channel) again.

Can that be achieved??


Solution

  • @StreamListener is deprecated; you should convert to the functional programming model instead.

    Here is an example using that model (but the same techniques apply to the deprecated listeners).

    spring.cloud.function.definition=input1;input2
    
    spring.cloud.stream.bindings.input1-in-0.group=grp1
    spring.cloud.stream.bindings.input2-in-0.consumer.auto-startup=false
    spring.cloud.stream.bindings.input2-in-0.group=grp2
    
    spring.cloud.stream.kafka.bindings.input2-in-0.consumer.idle-event-interval=5000
    
    @SpringBootApplication
    public class So69726610Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So69726610Application.class, args);
        }
    
        boolean dbIsDown = true;
    
        @Autowired
        BindingsLifecycleController controller;
    
        TaskExecutor exec = new SimpleAsyncTaskExecutor();
    
        @Bean
        public Consumer<String> input1() {
            return str -> {
                System.out.println(str);
                if (this.dbIsDown) {
                    this.controller.changeState("input1-in-0", State.PAUSED);
                    this.controller.changeState("input2-in-0", State.STARTED);
                    throw new RuntimeException("Paused");
                }
            };
        }
    
        @Bean
        public Consumer<String> input2() {
            return System.out::println;
        }
    
        @EventListener
        public void idle(ListenerContainerIdleEvent event) {
            System.out.println(event);
            // assumes concurrency = 1 (default)
            if (event.getListenerId().contains("input2-in-0")) {
                this.controller.changeState("input1-in-0", State.RESUMED);
                this.exec.execute(() -> this.controller.changeState("input2-in-0", State.STOPPED));
            }
        }
    
    }