I've got an Asp.Net 2.1 project that acts as a service host for consuming messages published by other processes/applications. I've setup/configured multiple consumers in the Startup class (Startup.cs) as below (only MassTransit portion is given here for brevity):
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<SendMessageConsumer>();
services.AddScoped<AnotherMessageConsumer>();
services.AddMassTransit(c =>
{
c.AddConsumer<SendMessageConsumer>();
c.AddConsumer<AnotherMessageConsumer>();
});
services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host("localhost", "/", h => { });
cfg.ReceiveEndpoint(host, "Queue-1", e =>
{
e.PrefetchCount = 16;
e.UseMessageRetry(x => x.Interval(2, 100));
e.LoadFrom(provider);
e.Consumer<SendMessageConsumer>();
EndpointConvention.Map<Message>(e.InputAddress);
});
cfg.ReceiveEndpoint(host, "Queue-2", e =>
{
e.PrefetchCount = 16;
e.UseMessageRetry(x => x.Interval(2, 100));
e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();
EndpointConvention.Map<AnotherMessage>(e.InputAddress);
});
}));
services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
services.AddSingleton<IHostedService, BusService>();
}
Messages:
namespace MasstransitDemo.Models
{
public class Message
{
public string Value { get; set; }
}
public class AnotherMessage
{
public string Value { get; set; }
}
}
Consumers:
public class SendMessageConsumer : IConsumer<Message>
{
public Task Consume(ConsumeContext<Message> context)
{
Console.WriteLine($"Receive message value: {context.Message.Value}");
return Task.CompletedTask;
}
}
public class AnotherMessageConsumer : IConsumer<AnotherMessage>
{
public Task Consume(ConsumeContext<AnotherMessage> context)
{
Console.WriteLine($"Receive another message value: {context.Message.Value}");
return Task.CompletedTask;
}
}
This causes both messages to come through to each queue. See the resulting RabbitMq exchanges below:
How do I set it up to make SendMessageConsumer to receive only "Message" and AnotherMessageConsumer to receive "AnotherMessage"?
Thanks in advance.
You tell MassTransit about your consumer explicitly, but also load all consumers from the container, for each endpoint.
e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();
By doing this, you all both consumers to each endpoint with LoadFrom
, plus one consumer in addition by Consumer<T>
. So each of your endpoints gets three consumers, and you get both queues bound to both exchanges.
You don't need to use LoadFromContainer
here. If your consumer has a dependency that needs to be resolved by the container, you can use this:
e.Consumer<AnotherMessageConsumer>(container);