Search code examples
c#azureazure-eventhub

Azure EventHubs throws Exception: At least one receiver for the endpoint is created with epoch of '0', and so non-epoch receiver is not allowed


Introduction

Hello all, we're currently working on a microservice platform that uses Azure EventHubs and events to sent data in between the services. Let's just name these services: CustomerService, OrderService and MobileBFF.

The CustomerService mainly sends updates (with events) which will then be stored by the OrderService and MobileBFF to be able to respond to queries without having to call the CustomerService for this data.

All these 3 services + our developers on the DEV environment make use of the same ConsumerGroup to connect to these event hubs.

We currently make use of only 1 partition but plan to expand to multiple later. (You can see our code is already made to be able to read from multiple partitions)

Exception

Every now and then we're running into an exception though (if it starts it usually keeps throwing this error for an hour or something). For now we've only seen this error on DEV/TEST environments though.

The exception:

Azure.Messaging.EventHubs.EventHubsException(ConsumerDisconnected): At least one receiver for the endpoint is created with epoch of '0', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected.

All consumers of the EventHub, store their SequenceNumber in their own Database. This allows us to have each consumer consume events separately and also store the last processed SequenceNumber in it's own SQL database. When the service (re)starts, it loads the SequenceNumber from the db and then requests events from here onwards untill no more events can be found. It then sleeps for 100ms and then retries. Here's the (somewhat simplified) code:

var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
string[] allPartitions = null;
await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
{
    allPartitions = await consumer.GetPartitionIdsAsync(stoppingToken);
}

var allTasks = new List<Task>();

foreach (var partitionId in allPartitions)
{
    //This is required if you reuse variables inside a Task.Run();
    var partitionIdInternal = partitionId;

    allTasks.Add(Task.Run(async () =>
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
                {
                    EventPosition startingPosition;
                    using (var testScope = _serviceProvider.CreateScope())
                    {
                        var messageProcessor = testScope.ServiceProvider.GetService<EventHubInboxManager<T, EH>>();
                        //Obtains starting position from the database or sets to "Earliest" or "Latest" based on configuration
                        startingPosition = await messageProcessor.GetStartingPosition(_inboxOptions.InboxIdentifier, partitionIdInternal);
                    }

                    while (!stoppingToken.IsCancellationRequested)
                    {
                        bool processedSomething = false;
                        await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync(partitionIdInternal, startingPosition, stoppingToken))
                        {
                            processedSomething = true;

                            startingPosition = await messageProcessor.Handle(partitionEvent);
                        }

                        if (processedSomething == false)
                        {
                            await Task.Delay(100, stoppingToken);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //Log error / delay / retry
            }

        }
    }
}

The exception is thrown on the following line:

await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))

More investigation

The code described above is running in the MicroServices (which are hosted as AppServices in Azure)

Next to that we're also running 1 Azure Function that also reads events from the EventHub. (Probably uses the same consumer group).

According to the documentation here: https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups it should be possible to have 5 consumers per consumer group. It seems to be suggested to only have one, but it's not clear to us what could happen if we don't follow this guidance.

We did do some tests with manually spawning multiple instances of our service that reads events and when there were more then 5 this resulted in a different error which stated quite clearly that there could only be 5 consumers per partition per consumer group (or something similar).

Furthermore it seems like (we're not 100% sure) that this issue started happening when we rewrote the code (above) to be able to spawn one thread per partition. (Even though we only have 1 partition in the EventHub). Edit: we did some more log-digging and also found a few exception before merging in the code to spawn one thread per partition.


Solution

  • That exception indicates that there is another consumer configured to use the same consumer group and asserting exclusive access over the partition. Unless you're explicitly setting the OwnerLevel property in your client options, the likely candidate is that there is at least one EventProcessorClient running.

    To remediate, you can:

    • Stop any event processors running against the same Event Hub and Consumer Group combination, and ensure that no other consumers are explicitly setting the OwnerLevel.

    • Run these consumers in a dedicated consumer group; this will allow them to co-exist with the exclusive consumer(s) and/or event processors.

    • Explicitly set the OwnerLevel to 1 or greater for these consumers; that will assert ownership and force any other consumers in the same consumer group to disconnect.
      (note: depending on what the other consumer is, you may need to test different values here. The event processor types use 0, so anything above that will take precedence.)