Search code examples
.netazureazure-cosmosdbazure-cosmosdb-changefeed

Change Feed Processor Issue


I am trying to test change feed processor in .net. I have tried to use the change feed processor from the beginning (as mentioned in documentation). When I start the change feed processor, it is running as expected, if I make a change in COSMOS db, it is triggering HandleChanges method. I wanted to test one scenario: I have stopped my change feed processor locally, made 2 changes to cosmos db and started the processor, this time the processor was only picking the latest change. Why is this? Am I missing something in the code?

This is my code:

    public class ChangeFeedListener 
    {
        private static CosmosClient _cosmosClient;
        private static Database _productDatabase;
        private static Container _productContainer;
        private static Container _productLeaseContainer;
        private IAuditMessenger _auditMessenger = null;

        public ChangeFeedListener(IAuditMessenger auditMessenger,TelemetryClient telemetryClient)
        {
            _auditMessenger = auditMessenger;
            _telemetryClient = telemetryClient;
        }
        public async Task StartListener(CancellationToken cancellationToken)
        {
            
            _cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
            _productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
            _productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
            _productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);

            await StartChangeFeedProcessorAsync(_cosmosClient);
        }

        private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
            CosmosClient cosmosClient)
        {
            

            Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
            {
                if (exception is ChangeFeedProcessorUserException userException)
                {
                    //handle
                }
                else
                {
                    //handle
                }

                return Task.CompletedTask;
            };

            

            Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);

            string processorName = "abc";
            string instanceName = "test";

            

            ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
                .GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
                .WithErrorNotification(onErrorAsync)
                .WithInstanceName(instanceName) 
                .WithLeaseContainer(leaseContainer)
                .WithStartTime(DateTime.MinValue.ToUniversalTime())
                .Build();


            
            await changeFeedProcessor.StartAsync();

            
            
            return changeFeedProcessor;
        }

        private async Task HandleChangesAsync(
            IReadOnlyCollection<ABCItem> changes,
            CancellationToken cancellationToken)
        {
            //handler code
        }

    }
}

Solution

  • As I was using service fabric explorer, even after I stopped local debugging, the listener continued to run in the background, hence it was processing them in the background. There was no data loss and it worked fine. I realized this when the continuation token with TS seemed to changed even after I stopped the listener