My use-case is to set consumer group offset based on timestamp. For this I am using seekToTimestamp method of ConsumerSeekCallback inside onPartitionsAssigned() method of ConsumerSeekAware.
Now when I started my application it seeks to the timestamp I specified but during rebalancing, it seeks to that timestamp again.
I want this to happen only when if ConsumerGroup Offset is less than the offsets at that particular timestamp, if it's greater than that then it should not seek.
Is there a way we can achieve this or does Spring-Kafka provides some listeners for the new ConsumerGroup so when the new consumer group gets created it will invoke seek based on timestamp otherwise will use the existing offsets?
public class KafkaConsumer implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = 1623775969;
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
}
Just add a boolean field (boolean seeksDone;
) to your implementation; set it to true after seeking and only seek if it is false.
You have to decide, though, what to do if you only get partitions 1 and 3 on the first rebalance and 1, 2, 3, 4 on the next.
Not an issue if you only have one application instance, of course. But, if you need to seek each partition when it is first assigned, you'll have to track the state for each partition.