Search code examples
c#azure-eventhub

Azure Event Hubs : QuotaExceededException : 4999 handles limit


I am using Azure Event Hub library for C# in WebApi App. And I got this exception on message sending:

"Message":"An error has occurred.",
"ExceptionMessage":"Cannot allocate more handles to the current session or connection. The maximum number of handles allowed is 4999. Please free up resources and try again., referenceId: d975f39c71a14ae5915c9adca322e110_G15"
"ExceptionType":"Microsoft.ServiceBus.Messaging.QuotaExceededException"

I thought that instantiantion of EventHubProducerClient one time and reusing it instead of creating its instance on each message sending (with IAsyncDisposable pattern) will help as mentioned here EventHub Exception :Cannot allocate more handles to the current session or connection. But it did not.

I believe there might be some more global issue. Might be missing something.

  • I am using single event hub with 7 consumer groups each of which is used by separate web application (single); well actually there is additional consumer group ($Default) but it is not used;
  • For receiving messages I use EventProcessorClient;
  • I use Azure.Messaging.EventHubs 5.2.0 and Azure.Messaging.EventHubs.Processor 5.2.0 packages;
  • Here is the whole code (did everything according to quickstart):
    public class EventHubService : SubscriberBase
    {
        private readonly Action<string> errorHandler;
        private readonly BlobContainerClient blobContainerClient;
        private readonly EventProcessorClient eventProcessorClient;
        private readonly EventHubProducerClient eventHubProducerClient;
        private readonly int eventsToCheckpoint;
        private readonly Timer checkpointTimer;
        private int eventsSinceLastCheckpoint;
        private bool shouldUpdateCheckpoint;

        public EventHubService(EventHubSettings settings, Action<string> errorHandler) : base()
        {
            this.errorHandler = errorHandler;
            eventHubProducerClient = new EventHubProducerClient(settings.ConnectionString, settings.EventHubName);

            if (!String.IsNullOrWhiteSpace(settings.GroupId))
            {
                eventManager = new EventManager();

                blobContainerClient = new BlobContainerClient(settings.StorageConnectionString, settings.BlobContainerName);
                eventProcessorClient = new EventProcessorClient(blobContainerClient, settings.GroupId, settings.ConnectionString, settings.EventHubName);

                eventsToCheckpoint = settings.EventsToUpdateCheckpoint;
                checkpointTimer = new Timer(settings.IntervalToUpdateCheckpoint.TotalMilliseconds);
                checkpointTimer.Elapsed += (sender, eventArgs) => shouldUpdateCheckpoint = true;
            }
        }

        public override void Start()
        {
            eventProcessorClient.ProcessErrorAsync += HandleError;
            eventProcessorClient.ProcessEventAsync += ProcessEventData;
            eventProcessorClient.StartProcessingAsync().Wait();
            checkpointTimer.Start();
        }

        public override async Task Stop()
        {
            try
            {
                checkpointTimer.Stop();
                await eventProcessorClient.StopProcessingAsync();
            }
            finally
            {
                eventProcessorClient.ProcessEventAsync -= ProcessEventData;
                eventProcessorClient.ProcessErrorAsync -= HandleError;
            }
        }

        public override async Task Publish(string topic, JObject message)
        {
            using (EventDataBatch eventBatch = await eventHubProducerClient.CreateBatchAsync())
            {
                var @event = new Event(topic, message);
                string json = @event.ToString(Formatting.None);
                byte[] bytes = Encoding.UTF8.GetBytes(json);
                var eventData = new EventData(bytes);
                eventBatch.TryAdd(eventData);

                await eventHubProducerClient.SendAsync(eventBatch);
            }
        }

        private async Task ProcessEventData(ProcessEventArgs eventArgs)
        {
            if (eventArgs.CancellationToken.IsCancellationRequested)
            {
                return;
            }

            if (++eventsSinceLastCheckpoint >= eventsToCheckpoint)
            {
                eventsSinceLastCheckpoint = 0;
                shouldUpdateCheckpoint = true;
            }

            if (shouldUpdateCheckpoint)
            {
                await eventArgs.UpdateCheckpointAsync();
                shouldUpdateCheckpoint = false;
            }

            string json = Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray());
            var @event = new Event(json);
            eventManager.TryRaise(@event);
        }

        private Task HandleError(ProcessErrorEventArgs eventArgs)
        {
            if (!eventArgs.CancellationToken.IsCancellationRequested)
            {
                errorHandler.Invoke($"[P:{eventArgs.PartitionId}][O:{eventArgs.Operation}] {eventArgs.Exception.Message}");
            }

            return Task.CompletedTask;
        }
    }  

I have found some info in Service Bus Quotas like:

Number of concurrent receive requests on a queue, topic, or subscription entity (5000).

Subsequent receive requests are rejected, and an exception is received by the calling code. This quota applies to the combined number of concurrent receive operations across all subscriptions on a topic.

But still can't figure how to deal with it.

Please help. Thanks.


Solution

  • This is indeed the answer EventHub Exception :Cannot allocate more handles to the current session or connection.

    I did similar "fix" for Azure Event Hub library for NET Core but I have forgotten that I am also using Azure Event Hub library for NET Framework!

    So I have instantiated EventHubProducerClient one time and reusing it now. Seems working fine.

    My bad. Was not attentive enough.