Search code examples
spring-bootapache-kafkaspring-cloud-streamresilience4j

Kafka Consumer with Circuit Breaker, Retry Patterns using Resilience4j


I need some help in understanding how I can come up with a solution using Spring boot, Kafka, Resilence4J to achieve a microservice call from my Kafka Consumer. Let's say if the Microservice is down then I need to notify my Kafka consumer using a circuit breaker pattern to stop fetching the messages/events until the Microservice is up and running.


Solution

  • With Spring Kafka, you could use the pause and resume methods depending on the CircuitBreaker state transitions. The best way I found for this is to define it as "supervisor" with an @Configuration Annotation. Resilience4j is also used.

    @Configuration
    public class CircuitBreakerConsumerConfiguration {
    
    public CircuitBreakerConsumerConfiguration(CircuitBreakerRegistry circuitBreakerRegistry, KafkaManager kafkaManager) {
        circuitBreakerRegistry.circuitBreaker("yourCBName").getEventPublisher().onStateTransition(event -> {
      
            switch (event.getStateTransition()) {
                case CLOSED_TO_OPEN:
                case CLOSED_TO_FORCED_OPEN:
                case HALF_OPEN_TO_OPEN:
                    kafkaManager.pause();
                    break;
                case OPEN_TO_HALF_OPEN:
                case HALF_OPEN_TO_CLOSED:
                case FORCED_OPEN_TO_CLOSED:
                case FORCED_OPEN_TO_HALF_OPEN:
                    kafkaManager.resume();
                    break;
                default:
                    throw new IllegalStateException("Unknown transition state: " + event.getStateTransition());
            }
        });
       }
    }
    

    This is what I used in combination with a KafkaManager annotated with @Component.

    @Component
    public class KafkaManager {
      private final KafkaListenerEndpointRegistry registry;
    
      public KafkaManager(KafkaListenerEndpointRegistry registry) {
        this.registry = registry;
      }
      public void pause() {   
        registry.getListenerContainers().forEach(MessageListenerContainer::pause);
      }
    
      public void resume() {
        registry.getListenerContainers().forEach(MessageListenerContainer::resume);
      }
    }
    

    In addition my consumer service looks like this:

      @KafkaListener(topics = "#{'${topic.name}'}", concurrency = "1", id = "CBListener")
    public void receive(final ConsumerRecord<String, ReplayData> replayData, Acknowledgment acknowledgment) throws
            Exception {
    
        try {
            httpClientServiceCB.receiveHandleCircuitBreaker(replayData);
            acknowledgement.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(1000);
        }
    }
    

    And the @CircuitBreaker Annotation:

    @CircuitBreaker(name = "yourCBName")
    public void receiveHandleCircuitBreaker(ConsumerRecord<String, ReplayData> replayData) throws
            Exception {
        try {
            String response = restTemplate.getForObject("http://localhost:8081/item", String.class);
        } catch (Exception e                                                                       ) {
           
            // throwing the exception is needed to trigger the Circuit Breaker state change
            throw new Exception();
        }
    }
    

    And this is additionally supplemented by the following application.properties

      resilience4j.circuitbreaker.instances.yourCBName.failure-rate-threshold=80
      resilience4j.circuitbreaker.instances.yourCBName.sliding-window-type=COUNT_BASED
      resilience4j.circuitbreaker.instances.yourCBName.sliding-window-size=5
      resilience4j.circuitbreaker.instances.yourCBName.wait-duration-in-open-state=10000
      resilience4j.circuitbreaker.instances.yourCBName.automatic-transition-from-open-to-half-open-enabled=true
      spring.kafka.consumer.enable.auto.commit = false
      spring.kafka.listener.ack-mode = MANUAL_IMMEDIATE
    

    Also have a look at https://resilience4j.readme.io/docs/circuitbreaker