I have an ASP.Net Core application where i am trying to listen to messages coming from RabbitMQ. In order to do this i added a HostedService where a listener waits to receive messages. When a message comes in i want to save it to the database using Entity Framework Core. The problem is that the dbContext is disposed before i can save it. Normal http request can acces the dbContext just fine only messages from the HostedService have this issue.
To listen to messages from RabbitMQ i added a HostedService called RabbitMqConsumer.
Program.cs
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
})
.ConfigureServices(services =>
{
services.AddHostedService<RabbitMqConsumer>();
});
The implementation of RabbitMqConsumer is shown below:
public class RabbitMqConsumer : BackgroundService
{
public readonly IServiceScopeFactory _serviceScopeFactory;
public RabbitMqConsumer(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (var scope = _serviceScopeFactory.CreateScope())
{
await Process(scope.ServiceProvider, stoppingToken);
}
}
private async Task Process(IServiceProvider provider,CancellationToken stoppingToken)
{
var queueListenerService =
provider.GetRequiredService<IMachineLearningQueueListener>();
await queueListenerService.StartAsync(stoppingToken);
}
}
In this consumer we want to start listening to the queue which is implemented in the MachineLearningQueueListener
public IDocumentService _documentService { get; set; }
public MachineLearningQueueListener(IDocumentService documentService)
{
_documentService = documentService;
factory = new ConnectionFactory() { HostName = Host, Port = Port };
connection = factory.CreateConnection();
channel = connection.CreateModel();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await StartListeningToQueue();
}
public async Task StopAsync(CancellationToken cancellationToken)
{
connection.Close();
}
private async Task StartListeningToQueue()
{
channel.QueueDeclare(queue: "ClassificationResult",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var classificationConsumer = new EventingBasicConsumer(channel);
classificationConsumer.Received += (model, ea) =>
{
ConsumeClassificationResult(ea.Body.ToArray());
};
channel.BasicConsume(queue: "ClassificationResult",
autoAck: true,
consumer: classificationConsumer);
}
Dependency injection is setup like this in Startup.cs:
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IMachineLearningQueueListener, MachineLearningQueueListener>();
services.AddScoped<IDocumentService, DocumentService>();
services.AddScoped<IDocumentRepository, DocumentRepository>();
services.AddDbContext<DocumentContext>(options =>
options.UseSqlServer(Configuration.GetConnectionString("DocumentContext")));
}
I am using a repository pattern so the listener uses the DocumentService, the service uses the DocumentRepository and the repository uses the DbContext.
The error that is thrown is shown in the image here: ObjectDisposedException
Any help in resolving this issue is greatly appreciated.
The solution was to add Console.Readline()
at the end of the StartListeningToQueue function to prevent it from exiting. It doesn't block the event based triggers from the EventingBasicConsumer so it can receive the messages.
It is documented in RabbitMQ's tutorial for a console application here: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
I don't know if there is a cleaner solution for this but it works for now.