When implementing generic consumers for a Service Bus in a .NET Core 8 Web API application, I'm encountering a problem where the IServiceProvider instance is found to be disposed or null when attempting to resolve dependencies within the consumer. How can I ensure that the IServiceProvider remains valid and accessible within the consumer to properly resolve dependencies, such as the event processor strategy, without being disposed?
Context: What I am trying to achieve is a reusable and generic approach for subscribing to a Service Bus topic across multiple microservices. Each microservice should be able to easily implement this generic class and provide its own strategy for processing the received events.
I am using HostedService, specifically the PostConsumerHostedService, which is responsible for triggering the consumer logic.
The problem arises when attempting to use the IServiceProvider within the consumer, as it is found to be disposed during runtime.
Firstly, here's the PostConsumerHostedService that triggers the consumer:
public class PostConsumerHostedService : CronJobServiceBase
{
private readonly IServiceProvider _service;
public PostConsumerHostedService(
IOptions<PostConsumerHostedServiceSettings> postConsumerHostedServiceSettings,
ILogger<CronJobServiceBase> log,
IServiceProvider service)
: base(postConsumerHostedServiceSettings, log)
{
_service = service;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
using var scope = _service.CreateAsyncScope();
var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
await postTaskService.StartAsync(cancellationToken);
}
}
Next, the PostConsumerTaskService which calls the IServiceBusConsumer:
public interface IPostConsumerTaskService : ITaskService, IHandleServiceBusMessage
{
}
public class PostConsumerTaskService : IPostConsumerTaskService
{
private readonly IServiceBusConsumer _serviceBusConsumer;
private readonly IServiceProvider _serviceProvider;
public PostConsumerTaskService(
IServiceBusConsumer serviceBusConsumer,
IServiceProvider serviceProvider)
{
_serviceBusConsumer = serviceBusConsumer;
_serviceProvider = serviceProvider;
}
public async Task HandleStringMessageAsync(string message)
{
var jsonDocument = JsonDocument.Parse(message);
// Extract the value of the "type" field
if (jsonDocument.RootElement.TryGetProperty("EventType", out JsonElement typeElement))
{
string type = typeElement.GetString();
Console.WriteLine("Type: " + type);
Enum.TryParse(type, out EventType myEvent);
var eventStrategy = await _eventProcessorFactory.GetEventProcessorStrategy(_serviceProvider,myEvent);
eventStrategy.ProcessEvent(message);
}
else
{
Console.WriteLine("Type field not found in JSON.");
}
await Task.CompletedTask;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusConsumer.StartAsync<LikeCreated>(this, cancellationToken);
}
}
And here's the ServiceBusConsumerBase, where the issue occurs when trying to retrieve the event processor strategy:
public abstract class ServiceBusConsumerBase: IServiceBusConsumer
{
private readonly ServiceBusConsumerSettingsBase _serviceBusConsummerSettings;
public ServiceBusConsumerBase(IOptions<ServiceBusConsumerSettingsBase> serviceBusConsummerSettings)
{
_serviceBusConsummerSettings = serviceBusConsummerSettings.Value;
}
public async Task StartAsync<T>(IHandleServiceBusMessage handleServiceBusMessage,CancellationToken cancellationToken)
{
var serviceBusClient = new ServiceBusClient(_serviceBusConsummerSettings.ConnectionString);
var processor = serviceBusClient.CreateProcessor(_serviceBusConsummerSettings.Topic, _serviceBusConsummerSettings.Subscription, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1
});
// handle received messages
//processor.ProcessMessageAsync += args => MessageHandler<T>(args,handleServiceBusMessage);
processor.ProcessMessageAsync += args => MessageStringHandler(args, handleServiceBusMessage);
processor.ProcessErrorAsync += ErrorHandler;
// Start processing
await processor.StartProcessingAsync(cancellationToken);
Console.WriteLine("Task running");
}
// handle received messages
async Task MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage)
{
var body = args.Message.Body.ToString();
Console.WriteLine($"Received: imageFileName {body}");
await handleServiceBusMessage.HandleStringMessageAsync(body);
// complete the message. messages are deleted from the subscription.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
}
The error:
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ThrowHelper.ThrowObjectDisposedException()
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at PostService.Factories.EventProcessorFactory.GetEventProcessorStrategy(IServiceProvider serviceProvider, EventType eventType) in /Users/dmata/Projects/InstagramSD/PostService/Factories/EventProcessorFactory.cs:line 31
at PostService.TaskServices.LikeTopicConsumerTaskService.HandleStringMessageAsync(String message) in /Users/dmata/Projects/InstagramSD/PostService/TaskServices/LikeTopicConsumerTaskService.cs:line 54
at Common.ServiceBus.ServiceBusConsumerBase.MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage) in /Users/dmata/Projects/InstagramSD/Common/ServiceBus/ServiceBusConsumerBase.cs:line 59
at Azure.Messaging.ServiceBus.ServiceBusProcessor.OnProcessMessageAsync(ProcessMessageEventArgs args)
at Azure.Messaging.ServiceBus.ReceiverManager.OnMessageHandler(EventArgs args)
When you initially "start" the operation here:
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
using var scope = _service.CreateAsyncScope();
var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
await postTaskService.StartAsync(cancellationToken);
}
Notice how you call "Start" and then dispose of the scope
variable. The problem with that is that the actual handlers are executed after all of this, and they try to rely again on the IServiceProvider
. The provider has been disposed alongside the scope
so you can't do that.
One way to work around this is to not dispose off the scope
there and maybe just store it as a field and implement IDisposable
on your class to dispose of the scope later.
Something like this, for example, would probably work.
public class PostConsumerHostedService : CronJobServiceBase, IAsyncDisposable
{
private readonly IServiceProvider _service;
private AsyncServiceScope _scope;
public PostConsumerHostedService(
IOptions<PostConsumerHostedServiceSettings> postConsumerHostedServiceSettings,
ILogger<CronJobServiceBase> log,
IServiceProvider service)
: base(postConsumerHostedServiceSettings, log)
{
_service = service;
}
public async ValueTask DisposeAsync()
{
await this.scope?.DisposeAsync();
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
this.scope = _service.CreateAsyncScope();
var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
await postTaskService.StartAsync(cancellationToken);
}
}
Just keep in mind that this design has a flaw in it that would present itself if you decide to call ExecuteTaskAsync
multiple times on the same instance as the scope would be overridden.
Having said that, probably the cleanest way to fully resolve this issue is to make sure you don't try to reach out to the container again (using IServiceProvider
) after you first resolve the postTaskService
: the problem would also go away that way.
Now I will say this though: I think you should reconsider your design here. Instead of creating a scope in the hosted service, it is a better design to create one scope for each message that needs processing: when a message arrives, create a scope for it from the main IServiceProvider
(or just inject IServiceScopeFactory
). From this scope, create the message processor, then have it process the message. Once this is done, you can dispose off the scope.
This design better models existing patterns where you create a "scope for each request". It is similar to what happens in AspNetCore for example, but instead of HTTP requests, you are dealing with service bus requests.
One scope per message also avoids issues such as keeping instances for too long or inadvertently sharing them across handlers. For example, if you were to use a single DbContext
from EFCore, you would start to see many problems such as high memory usage and concurrency issues. It is best to always have a single instance per "processing unit" when dealing with classes like that.