I am trying to create a headers exchange in RabbitMQ with MassTransit, so a consumer only consumes messages from a specific queue based on headers in the message.
I configured my producer:
builder.Services.AddMassTransit(mt =>
{
mt.SetKebabCaseEndpointNameFormatter();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.Publish<OrderSubmitted>(p =>
{
p.ExchangeType = "headers";
});
});
});
This is my consumer configuration:
builder.Services.AddMassTransit(mt =>
{
mt.AddConsumer<OrderPickupConsumer>();
mt.AddConsumer<OrderDeliveryConsumer>();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "pickup" },
{ "x-match", "all" }
});
});
});
cfg.ReceiveEndpoint("OrderDelivery", re =>
{
re.ConfigureConsumer<OrderDeliveryConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "delivery" },
{ "x-match", "all" }
});
});
});
});
});
I publish the messages like this:
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "pickup",
Product = "Pizza"
});
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "delivery",
Product = "Burgers"
});
As far as I understand the setup above is correct, however when starting the receiving application an error is thrown:
ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity
I have a clean RabbitMQ instance without any previously existing queues and even when RabbitMQ is not started at all I also get the error.
Is there something wrong with my configuration? Or might this be a bug in MassTransit?
After a day of investigation I found that the error did not show when I added re.ConfigureConsumeTopology = false;
However, the message was not delivered to the queue. To fix this I changed the SetBindingArgument
configuration.
cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "pickup");
x.SetBindingArgument("x-match", "all");
});
});
cfg.ReceiveEndpoint("OrderDelivery", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderDeliveryConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "delivery");
x.SetBindingArgument("x-match", "all");
});
});
Now the messages are sent to the correct queue, based on the header filter and pickup by the consumer.