I have an API with ASP.Net Core 3.1 which uses Azure.Messaging.EventHubs
and Azure.Messaging.EventHubs.Processor
where I get events from a consumer group and then send them to a SignalR hub. The processor runs only when there are users connected to the hub and stops when the last one gets disconnected updating its checkpoint in a BlobStorage.
The current logic of process for each events its: If the time diff in minutes between DateTime.UtcNow and the event timestamp is less than 2, it sends the event to the SignalR hub, and nothing more.
The problem is as follows: Sometimes there is a long period of time where the EventProcessorClient
is stopped and many events are retained in the EventHub, causing it to have a really long wait while slowly catching up to the most recent events until the SignalR Hub starts receiving them again. The period of time for the processor to catch up with the most recent events is way too much, specially when thinking about receiving hundreds of events per minute.
Is there a way of, for example, manually moving the checkpoint before starting the processor? or to get only the events of the last X minutes? maybe another idea/solution?
PS: I don't care for events older than 2 to 5 minutes for this consumer group.
PS2: The retention time configured in the EventHub is 1 day.
The code:
/* properties and stuff */
// Constructor
public BusEventHub(ILogger<BusEventHub> logger, IConfiguration configuration, IHubContext<BusHub> hubContext) {
_logger = logger;
Configuration = configuration;
_busExcessHub = hubContext;
/* Connection strings and stuff */
// Create a blob container client that the event processor will use
storageClient = new BlobContainerClient(this.blobStorageConnectionString, this.blobContainerName);
// Create an event processor client to process events in the event hub
processor = new EventProcessorClient(storageClient, consumerGroup, this.ehubNamespaceConnectionString, this.eventHubName);
// Register handlers for processing events and handling errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
}
public async Task Start() {
_logger.LogInformation($"Starting event processing for EventHub {eventHubName}");
await processor.StartProcessingAsync();
}
public async Task Stop() {
if (BusHubUserHandler.ConnectedIds.Count < 2) {
_logger.LogInformation($"Stopping event processing for EventHub {eventHubName}");
await processor.StopProcessingAsync();
} else {
_logger.LogDebug("There are still other users connected");
}
}
private async Task ProcessEventHandler(ProcessEventArgs eventArgs) {
try {
string receivedEvent = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());
_logger.LogDebug($"Received event: {receivedEvent}");
BusExcessMinified busExcess = BusExcessMinified.FromJson(receivedEvent);
double timeDiff = (DateTime.UtcNow - busExcess.Timestamp).TotalMinutes;
if (timeDiff < 2) {
string responseEvent = busExcess.ToJson();
_logger.LogDebug($"Sending message to BusExcess Hub: {responseEvent}");
await _busExcessHub.Clients.All.SendAsync("UpdateBuses", responseEvent);
}
_logger.LogDebug("Update checkpoint in the blob storage"); // So that the service receives only new events the next time it's run
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
} catch (TaskCanceledException) {
_logger.LogInformation("The EventHub event processing was stopped");
} catch (Exception e) {
_logger.LogError($"Exception: {e}");
}
}
/* ProcessErrorHandler */
It is possible to request an initial position for partitions as they're initialized, which will allow you to specify the enqueue time as your starting point. This sample illustrates the details. The caveat is that the initial position is only used when there is no checkpoint for a partition; checkpoints will always take precedence.
From the scenario that you're describing, it sounds as if checkpoints aren't useful to you and are getting in the way of your preferred usage pattern. If there aren't other mitigating factors, I'd recommend never checkpointing and instead overriding the default starting position to dynamically reset to the time that you're interested in.
If you do, for some reason, need to checkpoint in addition to this then your best bet will be deleting the checkpoint data, as checkpoints are based on the offset and won't recognize an enqueue time for positioning.