Search code examples
asp.net-corerabbitmqconsole-applicationmasstransitproducer-consumer

Masstransit and RabbitMQ to Console Application (use Asp.net Core Web API)


I created a Solution with ASP.NET Core WEB API project, some class libraries (Domain, DI and ect), and a console application. A console application that I use as a RabbitMQ Consumer with Masstransit library it should take messages from RabbitMQ (I have Producer project and it sends for RabbitMQ messages without problems)

My ConsoleApplication: like this Program.cs:

 public class Program
{
    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
        .ConfigureServices((hostContext, services) =>
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<MessageConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                {
                    var connectionString = new Uri("RabbitMQ_URL");
                    cfg.Host(connectionString);

                    cfg.ConfigureEndpoints(context);
                });
            });
            services.AddMassTransitHostedService(true);


            services.AddHostedService<Worker>();
        });
}

With Worker.cs:

  public class Worker : BackgroundService
{
    readonly IBus _bus;

    public Worker(IBus bus)
    {
        _bus = bus;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        
        while (!stoppingToken.IsCancellationRequested)
        {
            var factory = new ConnectionFactory() { Uri = new 
            Uri("RabbitMQ_URL"), DispatchConsumersAsync = true };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "MessageQueue",
                 durable: true,
                 exclusive: false,
                 autoDelete: false,
                 arguments: null);

            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += async (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                var @event = JsonConvert.DeserializeObject<Event> 
                    (message);

                await _bus.Publish(new Event { DataJson = @event });

                await Task.Yield(); 
            };

            channel.BasicConsume(queue: "MessageQueue",
                                 autoAck: true,
                                 consumer: consumer);
            
            _logger.LogInformation("Received Text: {Text}", context.Message.DataJson);

            return Task.CompletedTask;
        }
    }
}

MessageConsumer.cs:

 public class MessageConsumer :
    IConsumer<Event>
{
    readonly ILogger<MessageConsumer> _logger;

    public MessageConsumer(ILogger<MessageConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<Event> context)
    {
        _logger.LogInInformation("Recieved Text: {Text}, 
          context.Message.DataJson");

        return Task.CompletedTask;
     }
    }

And my Event.cs:

 public class Event
{
    public ServiceType ServiceType { get; set; }
    public string DataJson { get; set; }
}

public enum ServiceType
{
    ComplareSitter
}

Please help me Thanks a lot.


Solution

  • You might start with a clean and simple worker service using one of the MassTransit templates, just to verify your setup/configuration. There is a video available showing how to setup and use the templates.

    But an obvious question, why on earth are you connecting to RabbitMQ and creating a basic consumer inside the Consume method? The message has already been deserialized as your Event type and is ready to be used. There is absolutely no need to use any part of the RabbitMQ Client library in your application when using MassTransit.