Search code examples
javaapache-kafkaspring-kafka

Spring Kafka MessageListenerContainer Resume/Pause # spring-kafka


As native KafkaConsumer is not thread safe, so it is discouraged to call pause and resume methods from different thread instead of kafka-consumer processing thread. but as spring-kafka provides another layer KafkaMessageListenerContainer which internally use kafka-consumer. So my question is can we use KafkaListenerEndpointRegistry to get the listener container by id and call resume or pause method from other thread rather than consumer processing thread.

kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
       

    ExecutorService executorService  = newFixedThreadPool(2);
    executorService.submit(()->{

        System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
    });

Solution

  • Yes; container.pause() sets a flag to tell the Consumer thread to pause before its next poll() call. Similarly, resume() resets the flag so the consumer thread will resume the Consumer before the next poll.