Search code examples
spring-bootapache-kafkadistributed-system

How to stop consuming message from kafka and stop calling REST API call of another service in case of failure


I have service A(developed in springboot) which is consuming the payloads from different kafka topics, processing it and sending to service called B using REST API. lets say due to some reason if service B goes down then how to manage this failure situation, because service A is keep listening to messages and keep calling the API of service B to send the messages. I need some mechanis to recover from this situation. Below are some ways which I thought of implementing but I am not sure whether it is reliable or not.

  1. if we have any mechanism to stop consuming messages from kafka, then I can implement the same using circuit breaker pattern. (But in this case how to recover the payload which is already consumed ? and if I store it in memory cache then on restart of service A or some failure there is chances of loosing it)

  2. Consume and Store the messages in the in-memory cache and on startup of service A, send all those sequentially, but here the challenge is since it's in-memory cache, there is chances of loosing it, and another challenge is of managing the sequence in memory while storing.

PS: I can't use any external storage here.


Solution

  •  service A is keep listening to messages and keep calling the API of service B to send the messages

    It doesn't need to. You could retry the REST call a few times, then you would stop polling and close/pause the consumer. You also would not lose those messages until either that topic passes the retention window, and you do not commit any failed offsets.

    Then you need some way to monitor when service B starts back up. One option could be Spring Actuator custom health-checking where it could frequently check remote HTTP server endpoints for connectivity. When healthy, you can unpause / recreate the consumer thread.

    Spring-Kafka allows you to programmatically control the Listener with an KafkaListenerEndpointRegistry