So the assumption here is that Topic will have consumer group with multiple consumers with multiple partitions. Well, maybe same solution might be necessary even thinking with one partition and one consumer for the topic when consumer delegates asynchronously long tasks to actors or threads.
so what needs to be done here is that the manual consumer.commit() or consumer.commitAsync() would be necessary to make sure rebalancing or resuming after disaster recovery would guarantee data integrity with no omission or duplicate runs when things are back up and running again.
To achieve that, what also needs to be considered is that committed offset indeed mean that all previous messages in previous offsets are processed only once but surely completed.
I heard about customized 'Committer' service, which for example make sure if commit request came regarding at offset 102, then this service actually make sure commit offset 102 is safe by checking all messages from current offset (let's say it's at 90) up to requested offset (102) are all processed.
Wonder if this type of custom service is actually necessary and if so then what would be the best way to achieve this, maybe some central stateful service (redis or some database) to keep the committed history and commit requests or if that's overkill and Kafka actually provides some feature(s) regarding this type of scenario.
Since time has spent enough, I will try to answer myself. So below code will wait until all consecutive offset requests are collected and will commit only if from minimum offset requested value to highest value are all sequential. Then it will reset(empty) the set to update the minimum offset value to begin with and minimize iterative checks. Just one thing I couldn't find better way is I am sending commit request of List of TopicPartitionOffset, I actually need single offset commit with current setting (only one Partition and one Consumer per Topic scenario). The below method needs to be wrapped to make it thread-safe.
private void ValidateCommit<T>(IConsumer<Null, T> consumer, ConsumeResult<Null, T> result)
{
var topic = result.Topic;
var partitionVal = result.Partition.Value;
var offset = result.Offset.Value;
var key = $"{some unique key to separate per topic}";
if (!perPartitionSortedOffsets.ContainsKey(key))
{
perPartitionSortedOffsets[key] = new SortedSet<long>();
}
perPartitionSortedOffsets[key].Add(offset);
var offsets= perPartitionSortedOffsets[key].ToArray();
int cnt = 0;
long offsettocommit = 0;
for (int i = 1; i < offsets.Length; i++)
{
if (offsets[i] != offsets[i - 1] + 1)
{
cnt++;
break;
}
offsettocommit = offsets[i];
}
if (cnt == 0)
{
var committedOffset = new TopicPartitionOffset(new TopicPartition(result.Topic, result.Partition), new Offset(offsettocommit));
consumer.Commit(new List<TopicPartitionOffset> { committedOffset });
perPartitionSortedOffsets[key] = new SortedSet<long> { offsettocommit };
}
}