Search code examples
spring-kafka

Seek to an offset via an external trigger


Currently I use the AcknoledgingMessageListener to implement a Kafka consumer using spring-Kafka. This implementation helps me listen on a specific topic and process messages with a manual ack. I now need to build the following capability: Let us assume that for an some environmental exception or some entry of bad data via this topic, I need to replay data on a topic from and to a specific offset. This would be a manual trigger (mostly via the execution of a Java class).

It would be ideal if I can retrieve the messages between those offsets and feed it is a replay topic so that a new consumer can process those messages thus keeping the offsets intact on the original topic.

  1. CosumerSeekAware interface - if this is the answer how can I trigger this externally? Via let say a mvn -Dexec. I am not sure if this is even possible
  2. Also let say that I have an crash time stamp with me, is it possible to introspect the topic to find the offset corresponding to the crash so that I can replay from that offset?
  3. Can I find offsets corresponding to some specific data so that I can replay those specific offsets?

All of these requirements are towards building a resilience layer around our Kafka capabilities. I need all of these to be managed by a separate executable class that can be triggered manually providing the relevant data (like time stamps etc). This class should determine offsets and then seek to that offset, retrieve the messages corresponding to those offsets and post them to a separate topic. Can someone please point me in the right direction? I’m afraid I’m going around in circles.


Solution

  • so that a new consumer can process those messages thus keeping the offsets intact on the original topic.

    Just create a new listener container with a different group id (new consumer) and use a ConsumerAwareRebalanceListener (or ConsumerSeekAware) to perform the seeks when the partitions are assigned.

    Here is a sample CARL that seeks all assigned topics based on a timestamp.

    You will need some mechanism to know when the new consumer should stop consuming (at which time you can stop() the new container). Maybe set max.poll.records=1 on the new consumer so he doesn't prefetch past the failure point.

    I am not sure what you mean by #3.