Search code examples
c#asp.net-mvcasp.net-web-apirabbitmq

Merging File Upload and Template Creation Messages in ASP.NET Web API with RabbitMQ


I am developing an application using ASP.NET Web API and Angular with two main functionalities:

  1. File Upload: Users can upload files, and upon successful upload, the server generates URLs for these files. I need to publish these URLs to a RabbitMQ queue.

  2. Template Creation: Users can create templates, which also need to be published to another RabbitMQ queue.

Problem: I want to merge the messages from the two queues (the file URLs and template details) before processing them together. The final goal is to send emails that include the uploaded files as attachments whenever a template is created.

Questions: What is the best approach to merge messages from the two RabbitMQ queues in a reliable way? Should I use a dedicated processing service that consumes messages from both queues, or is there a better strategy? How can I ensure that the emails are sent only after both messages have been successfully merged? Any advice or examples would be greatly appreciated!

using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class AlertMessageConsumerService : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQSetting _rabbitMqSetting;
    private readonly ILogger<AlertMessageConsumerService> _logger;

    private IConnection _connection;
    private IModel _channel;

    public AlertMessageConsumerService(IOptions<RabbitMQSetting> rabbitMqSetting, IServiceProvider serviceProvider, ILogger<AlertMessageConsumerService> logger)
    {
        _rabbitMqSetting = rabbitMqSetting.Value;
        _serviceProvider = serviceProvider;
        _logger = logger;

        var factory = new ConnectionFactory
        {
            HostName = _rabbitMqSetting.HostName,
            UserName = _rabbitMqSetting.UserName,
            Password = _rabbitMqSetting.Password
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        StartConsuming("alertQueue", stoppingToken);
        StartConsuming("anotherQueue", stoppingToken);

        await Task.CompletedTask;
    }

    private void StartConsuming(string queueName, CancellationToken cancellationToken)
    {
        _channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            bool processedSuccessfully = false;
            try
            {
                processedSuccessfully = await ProcessMessageAsync(message);
            }
            catch (Exception ex)
            {
                _logger.LogError($"Exception occurred while processing message from queue {queueName}: {ex}");
            }

            if (processedSuccessfully)
            {
                _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            }
            else
            {
                _channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
            }
        };

        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

    private async Task<bool> ProcessMessageAsync(string message)
    {
        try
        {
            using (var scope = _serviceProvider.CreateScope())
            {
                var emailService = scope.ServiceProvider.GetRequiredService<IEmailSender>();
                var vsoService = scope.ServiceProvider.GetRequiredService<IVsoService>();
                var alertTypeService = scope.ServiceProvider.GetRequiredService<IAlertTypeService>();
                var s3Service = scope.ServiceProvider.GetRequiredService<IFileService>();

                var alertMessage = JsonConvert.DeserializeObject<AlertMessage>(message);
                var uploadMessage = JsonConvert.DeserializeObject<FileUploadMessage>(message);

                if (alertMessage != null && uploadMessage != null)
                {
                    _logger.LogInformation($"AlertMessage: {JsonConvert.SerializeObject(alertMessage)}");
                    _logger.LogInformation($"FileUploadMessage: {JsonConvert.SerializeObject(uploadMessage)}");

                    return true;
                }
                else
                {
                    _logger.LogWarning("Both AlertMessage and FileUploadMessage must be present in the message.");
                    return false;
                }
            }
        }
        catch (JsonException jsonEx)
        {
            _logger.LogError($"JSON error processing message: {jsonEx.Message}");
            return false;
        }
        catch (Exception ex)
        {
            _logger.LogError($"Error processing message: {ex.Message}");
            return false;
        }
    }

    public override void Dispose()
    {
        _channel.Close();
        _connection.Close();
        base.Dispose();
    }
}

Solution

  • If you already have the infrastructure for background processing in place, a dedicated process for this task sounds appropriate. In general, once you move into the space of coordinating asynchronous background processes, Enterprise Integration Patterns is an invaluable resource. Don't be fooled by the name; it's really about all kinds of asynchronous processing patterns, not just enterprise integration.

    In this case, it sounds like you need an Aggregator. Produce a correlation ID that you associate with both messages, and have your Aggregator wait for both kinds of messages with the same correlation ID. Once you have both messages, send your e-mail.

    Some frameworks (most notably NServiceBus) call this a Saga, so you may also want to look into that. Here's their documentation example: https://docs.particular.net/tutorials/nservicebus-sagas/1-saga-basics/

    (For the record: I'm not affiliated with NServiceBus.)