Search code examples
springapache-kafkaspring-kafka

Restart listener and continue from latest message


Case

  1. Clients are ReplyingKafkaTemplate instances.
  2. Server is a ConcurrentMessageListenerContainer created using @KafkaListener and @SendTo annotations on a method.
  3. ContainerFactory uses ContainerStoppingErrorHandler.
  4. Request topic has only 1 partition.
  5. Group ids are static. eg. test-consumer-group.
  6. Requests are sent with timeouts.
  7. Due to an exception thrown, server goes down but the client keeps dispatching requests which queue up on the request topic.

Current Behavior

When the server comes back up it continues processing old requests which would have timed out.

Desired Behavior

Instead, it would be better to continue with the last message; thereby skipping past even unprocessed messages as corresponding requests would timeout and retry.

Questions

  1. What is the recommended approach to achieve this?
  2. From the little that I understand, it looks like I'll have to manually set the initial offset. What's the simplest way to implement this?

Solution

  • Your @KafkaListener class must extends AbstractConsumerSeekAware and do something like this:

        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            super.onPartitionsAssigned(assignments, callback);
            callback.seekToEnd(assignments.keySet());
        }
    

    So, every time when your consumer joins the group it is going to seek all the assigned partitions to the end skipping all the old records.