Search code examples
javaazureazure-cosmosdbazure-cosmosdb-changefeed

Cosmos Change Feed Processor Lag is far more than number of records in collection


I'm using Cosmos Change Feed Processor using Java in my application to consume Cosmos Change Feed of a Cosmos NoSql DB container

  1. As per the documentation, if using Change feed processor approach, at the point at which we start consuming the change feed, all inserts/updates before that point are delivered as a single snapshot.

  2. As the process that I'm doing is in a Non Production environment(testing before doing in Prod), the container did not have substantial number of inserts/updates since my change feed consumption begun.

From above 2 points, we can conclude that the estimated lag returned by the change feed processor(when running and consuming updates) should not be multitudes higher than the total number of documents in the container

However, I see the estimated lag as ~130million where as I only have ~7million records in my container.

My container has only 1 physical partition(and hence only 1 instance of the Change feed processor running) and below is the code I'm using to calculate the estimated lag.

AtomicInteger totalLag = new AtomicInteger();
List<ChangeFeedProcessorState> currentState = changeFeedProcessor.getCurrentState().block();
if (CollectionUtils.isEmpty(currentState)) {
    System.out.println("Unexpected METRICS :: STATES is empty");
    continue;
}
for (ChangeFeedProcessorState changeFeedProcessorState : currentState) {
    totalLag.addAndGet(changeFeedProcessorState.getEstimatedLag());
}
System.out.println(totalLag.get());

Can someone please provide their expertise on this


Solution

  • TL;DR the Estimated Lag is absolutely not a "number of documents left to be processed" if you read what the documentation literally says (not much!) vs what you'd like it to say... The 'estimator' term is also clearly intended to convey the fact that this is not and can not be an exact metric.


    The Lag is a function of the current checkpointed 'position', and the position identifier of the most recent write. This position is also (approximately) what's used for continuation tokens. Each write, or batch of writes will move this forward (even an update). You can't rely on or assume that there are no gaps etc (think of any amount of reasons like replication, rolled back work etc)

    In other words, if you do an insert and 20 updates on the document, that will move the count forward by 21 or more. If you update 2 docs in a single logical partition, I think that might only move it by one.

    There's stacks of other factors though - e.g. when a physical partition splits, the positions stay the same, but half the data goes one side, and one half to the other. If you try to do math, you'll think there's twice as much work to do across the two 'new partitions' compared what the original partition's 'lag' value was.


    Aside from doing a actual traversal of the real data (which would have lots of side effects due to the associated RU consumption, there simply is no way to know what the actual gap is in terms of documents.

    Luckily, this doesn't truly matter either for the bulk of real world cases; there's a natural variability to the throughput that can be attained (and the consistency with which that can be achieved) for anything you attach to a changefeed - IME very few interesting systems have a sufficiently stable and consistent processing cost per document.

    The best thing is to put it on a chart and use it as an approximate indicator when comparing reasonably correlated situations (same number of documents with the same processing cost with equivalent processing power doing the handling of the items coming off the feed)