Search code examples
rabbitmqproducer-consumermasstransitasp.net-core-2.1

MassTransit (RabbitMq) + Asp.Net Core 2.1: Multiple consumers in the same project


I've got an Asp.Net 2.1 project that acts as a service host for consuming messages published by other processes/applications. I've setup/configured multiple consumers in the Startup class (Startup.cs) as below (only MassTransit portion is given here for brevity):

public void ConfigureServices(IServiceCollection services)
{   
    services.AddScoped<SendMessageConsumer>();
    services.AddScoped<AnotherMessageConsumer>();
    services.AddMassTransit(c =>
    {
        c.AddConsumer<SendMessageConsumer>();
        c.AddConsumer<AnotherMessageConsumer>();
    });

    services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host("localhost", "/", h => { });

        cfg.ReceiveEndpoint(host, "Queue-1", e =>
        {
            e.PrefetchCount = 16;
            e.UseMessageRetry(x => x.Interval(2, 100));

            e.LoadFrom(provider);
            e.Consumer<SendMessageConsumer>();
            EndpointConvention.Map<Message>(e.InputAddress);                    
        });

        cfg.ReceiveEndpoint(host, "Queue-2", e =>
        {
            e.PrefetchCount = 16;
            e.UseMessageRetry(x => x.Interval(2, 100));

            e.LoadFrom(provider);
            e.Consumer<AnotherMessageConsumer>();
            EndpointConvention.Map<AnotherMessage>(e.InputAddress);
        });
    }));

    services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
    services.AddSingleton<IHostedService, BusService>();   
}

Messages:

namespace MasstransitDemo.Models
{
    public class Message
    {
        public string Value { get; set; }
    }

    public class AnotherMessage
    {
        public string Value { get; set; }
    }
}

Consumers:

public class SendMessageConsumer : IConsumer<Message>
{
    public Task Consume(ConsumeContext<Message> context)
    {
        Console.WriteLine($"Receive message value: {context.Message.Value}");
        return Task.CompletedTask;
    }
}

public class AnotherMessageConsumer : IConsumer<AnotherMessage>
{
    public Task Consume(ConsumeContext<AnotherMessage> context)
    {
        Console.WriteLine($"Receive another message value: {context.Message.Value}");
        return Task.CompletedTask;
    }
}

This causes both messages to come through to each queue. See the resulting RabbitMq exchanges below:

enter image description here enter image description here

How do I set it up to make SendMessageConsumer to receive only "Message" and AnotherMessageConsumer to receive "AnotherMessage"?

Thanks in advance.


Solution

  • You tell MassTransit about your consumer explicitly, but also load all consumers from the container, for each endpoint.

    e.LoadFrom(provider);
    e.Consumer<AnotherMessageConsumer>();
    

    By doing this, you all both consumers to each endpoint with LoadFrom, plus one consumer in addition by Consumer<T>. So each of your endpoints gets three consumers, and you get both queues bound to both exchanges.

    You don't need to use LoadFromContainer here. If your consumer has a dependency that needs to be resolved by the container, you can use this:

    e.Consumer<AnotherMessageConsumer>(container);