Search code examples
springspring-bootapache-kafkaspring-kafka

How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?


I am currently implementing a micro service, which reads data from Apache Kafka topic. I am using "spring-boot, version: 1.5.6.RELEASE" for the micro service and "spring-kafka, version: 1.2.2.RELEASE" for the listener in the same micro service. This is my kafka configuration:

    @Bean
public Map<String, Object> consumerConfigs() {
    return new HashMap<String, Object>() {{
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
    }};
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

I have implemented the listener via the @KafkaListener annotation:

@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
    //business logic
    latch.countDown();
}

I need to be able to shutdown the micro service, when the listener looses connection to the Apache Kafka server.

When I kill the kafka server I get the following message in the spring boot log:

2017-11-01 19:58:15.721  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup

When I start the kafka sarver, I get:

2017-11-01 20:01:37.748  INFO 16800 --- [      0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.

So clearly the Spring Kafka Listener in my micro service is able to detect when the Kafka Server is up and running and when it's not. In the book by confluent Kafka The Definitive Guide in chapter But How Do We Exit? it is said that the wakeup() method needs to be called on the Consumer, so that a WakeupException would be thrown. So I tried to capture the two events (Kafka server down and Kafka server up) with the @EventListener tag, as described in the Spring for Apache Kafka documentation, and then call wakeup(). But the example in the documentation is on how to detect idle consumer, which is not my case. Could someone please help me with this. Thanks in advance.


Solution

  • I don't know how to get a notification of the server down condition (in my experience, the consumer goes into a tight loop within the poll()).

    However, if you figure that out, you can stop the listener container(s) which will wake up the consumer and exit the tight loop...

    @Autowired
    private KafkaListenerEndpointRegistry registry;
    
    ...
    
        this.registry.stop();
    

    2017-11-01 16:29:54.290 INFO 21217 --- [ad | so47062346] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator localhost:9092 (id: 2147483647 rack: null) dead for group so47062346

    2017-11-01 16:29:54.346 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

    ...

    2017-11-01 16:30:00.643 WARN 21217 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 0 could not be established. Broker may not be available.

    2017-11-01 16:30:00.680 INFO 21217 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer stopped

    You can improve the tight loop by adding reconnect.backoff.ms, but the poll() never exits so we can't emit an idle event.

    spring:
      kafka:
        consumer:
          enable-auto-commit: false
          group-id: so47062346
        properties:
          reconnect.backoff.ms: 1000
    

    I suppose you could enable idle events and use a timer to detect if you've received no data (or idle events) for some period of time, and then stop the container(s).