Search code examples
c#azureasp.net-coreazure-eventhub

Why Does Azure EventHub Processing Hang When Stopping (EventProcessorClient.StopProcessingAsync)?


I use an ASP.NET Core 8 Web API to read events from Azure EventHub. The service that reads the events is inside a HostedService.

Before processing each event, I need to check the health of an external API using Polly for retries. If the API is down after all retry attempts, I attempt to stop the EventHub processing for some minutes. However, the application hangs indefinitely when stopping the EventHub. It does not fail or throw any exceptions—it simply gets stuck.

This is the only information I got from Microsoft Docs:

When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.

Summary:

  • I read events from Azure EventHub
  • I check an API's health using Polly's retry mechanism
  • If Polly exhausts all retries and the API remains unavailable, I stop the EventHub
  • The application hangs indefinitely upon stopping the EventHub

The StopProcessingAsync() call seems to cause the EventHub processor to hang indefinitely.

The application does not throw an exception, and no errors are logged.

Could this be a deadlock issue? If so, how can I properly stop processing and resume later?
What is the recommended approach to stop EventHub processing?

public class EventHubServiceTask : IEventHubServiceTask
{
    private CancellationToken _startCancellationToken;

    public EventHubServiceTask()
    {
        var storageClient = new BlobContainerClient(_blobStorageSettings.ConnectionString, _blobStorageSettings.ContainerName);
        _eventProcessorClient = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, _eventHubSettings.ConnectionString, _eventHubSettings.EventHubName);
        _eventProcessorClient.ProcessEventAsync += ProcessEventHandler;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _startCancellationToken = cancellationToken;
        await _eventProcessorClient.StartProcessingAsync(cancellationToken);
    }

    private async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        MyEvent? myevent= JsonConvert.DeserializeObject<MyEvent>(eventArgs.Data.EventBody.ToString()) ?? new MyEvent();
 
        if (await ServiceHealthCheckNotAvailable(myevent.CorrelationId, eventArgs.CancellationToken))
        {
            await HandleHealthCheckFailureAsync(myevent.CorrelationId, eventArgs.CancellationToken);
        }

        await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
    }

    private async Task<bool> ServiceHealthCheckNotAvailable(Guid correlationId, CancellationToken cancellationToken)
    {
        return !await CheckHealthAsync(correlationId, cancellationToken);
    }

    private async Task HandleHealthCheckFailureAsync(Guid correlationId, CancellationToken cancellationToken)
    {
        try
        {
            await _eventProcessorClient.StopProcessingAsync(_startCancellationToken);
            await Task.Delay(TimeSpan.FromSeconds(_retryPolicySettings.EventHubSleepTimeSeconds), cancellationToken);
            await _eventProcessorClient.StartProcessingAsync(cancellationToken);
        }
        catch (Exception e)
        {
            throw;
        }
    }

    private async Task<bool> CheckHealthAsync(Guid correlationId, CancellationToken cancellationToken)
    {
        var policy = Policy
                        .HandleResult<bool>(result => !result)
                        .WaitAndRetryAsync(_retryPolicySettings.RetryCount, retryAttempt => TimeSpan.FromSeconds(retryAttempt));

        return await policy.ExecuteAsync(async () => await _service.IsHealthyAsync(correlationId, cancellationToken));
    }
}

Solution

  • In a nutshell, you're violating documented guidance for "don't stop the processor from your event processing handler because you'll deadlock." (see: EventProcessorClient.ProcessEventAsync)

    Why is there a deadlock?

    Any call that you make to HandleHealthCheckFailureAsync from ProcessEventHandler is going to cause a deadlock because the processor explicitly waits for all outstanding handlers to complete when stopping and your handler is explicitly waiting for stopping to complete before it finishes executing. Neither can make forward progress.

    Why do I not want to stop/start the processor for this?

    You shouldn't be stopping the processor to apply a delay. It's a very heavy operation that is going to have a big performance impact in the best case, as all outstanding calls will need to complete during shutdown and ownership is fully surrendered. When restarting, partitions will be claimed 1-by-1 and restarted from the last recorded checkpoint.

    In the worst case, you'll see your partition ownership jumping around between processor instances, each time causing rewinds in the stream to the last recorded checkpoint resulting in duplicates being processed.

    What should I be doing instead?

    Handlers do not have time constraints when called - the processor will wait forever for your handler to complete without complaining about it. As a result, it is safe to apply a delay in your handler while the processor is running.

    In your scenario, it seems that you'd want to pause processing for all partitions. Since the event processing handler will be invoked concurrently (up to one invocation per partition owned), you'd need to coordinate for your delay.

    One approach that you could use for this would be to use a reader/writer lock for the handler, where your processing handler grabs a reader lock. Your health check grabs a writer lock when it fails and releases it after your desired delay. I've also seen a manual reset event used for this scenario. This is fairly easy to implement, though it does come at a performance cost and requires use of a synchronous primitive, which will impact the host's thread pool and may have impact resource availability for doing other work on the host (like handling requests to pages/service endpoints).

    Another approach that you could use for this would be to set a volatile flag from your health check failure and unset it after your desired delay. Your processing handler would check this flag when executing and, if set, would apply a delay before moving forward. This is a bit tricker to implement correctly, less clear for readers, and less precise on timing. It does come with a lower performance penalty when the system is healthy and does not require using a synchronous primitive.