Search code examples
c#azureazure-eventhubcheckpointing

Update EventHub Partition Offsett Checkpoint on Azure.Messaging.EventHubs.EventProcessorClient When Idle


In my scenario I will have batches of events coming in all at once and then long periods of time when the EventHub will be idle. In my processor client I want to checkpoint every N events or N minutes (whichever comes first).

Here is how I've set up my Azure.Messaging.EventHubs.EventProcessorClient:

EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;

//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();

// Start the processing
await processor.StartProcessingAsync();

while (true)
{
    await Task.Delay(TimeSpan.FromSeconds(10));
    Console.WriteLine($"{eventsProcessed} events have been processed");
}

In my ProcessEventHandler I do a check on the eventsProcessedSinceLastCheckpoint as well as time elapsed on the stopwatch. When either one reaches their maximum I reset both and note it in my console window:

static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
   ++eventsProcessed;
   ++eventsProcessedSinceLastCheckpoint;

   Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

    // After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
    if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
    {
        eventsProcessedSinceLastCheckpoint = 0;
        _checkpointStopWatch.Restart();

        await eventArgs.UpdateCheckpointAsync();
        Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
    }
    return Task.CompletedTask;

}

The check on eventsProcessedSinceLastCheckpoint variable works perfectly as the ProcessEventHandler is fired whenever new events come in. However when the EventHub is idle ProcessEventHandler is not called so in cases when the EventHub is quiet for many minutes or hours I will never checkpoint on the time elapsed.

I understand that I could just remove the timer and that my processor should be able to handle duplicate events should a crash occur between checkpoints. But in my scenario (since I will have such long idle times) I want to take advantage of the time I have and catch up in order to avoid additional duplicates coming in when I can. Hence the addition of the timer as a fallback during idle periods.

My question is: How can I call UpdateCheckpointAsync() outside of ProcessEventHandler? The method only seems to exist on ProcessEventArgs. I cannot call it directly on EventProcessorClient which would be ideal as I can move the timer check outside of ProcessEventHandler and into my while loop....


Solution

  • Setting the EventHubProcessorClientOptions.MaximumWaitTime property when creating your processor instance will allow your handler to be called when no events are read. When set to non-null, the wait time basically means "give me events as soon as you get them, but ping my handler if no events have been read during this interval."

    With respect to updating checkpoints in this scenario, the recommended approach would be to cache the arguments for the last event that was dispatched to the handler and use that to invoke UpdateCheckpointAsync. This sample demonstrates the approach, ensuring that it creates a checkpoint when processing for a partition stops.