Search code examples
.net-corerabbitmqmasstransit

How do I consume a message based on a header with MassTransit in RabbitMQ?


I am trying to create a headers exchange in RabbitMQ with MassTransit, so a consumer only consumes messages from a specific queue based on headers in the message.

I configured my producer:

builder.Services.AddMassTransit(mt =>
{
    mt.SetKebabCaseEndpointNameFormatter();

    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.Publish<OrderSubmitted>(p =>
        {
            p.ExchangeType = "headers";
        });
    });
});

This is my consumer configuration:

builder.Services.AddMassTransit(mt =>
{
    mt.AddConsumer<OrderPickupConsumer>();
    mt.AddConsumer<OrderDeliveryConsumer>();

    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ReceiveEndpoint("OrderPickup", re =>
        {
            re.ConfigureConsumer<OrderPickupConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("headers", new Dictionary<string, string>
                {
                    { "Transport", "pickup" },
                    { "x-match", "all" }
                });
            });
        });

        cfg.ReceiveEndpoint("OrderDelivery", re =>
        {
            re.ConfigureConsumer<OrderDeliveryConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("headers", new Dictionary<string, string>
                {
                    { "Transport", "delivery" },
                    { "x-match", "all" }
                });
            });
        });
    });
});

I publish the messages like this:

_bus.Publish<OrderSubmitted>(new
            {
                __Header_Transport = "pickup",
                Product = "Pizza"
            });

            _bus.Publish<OrderSubmitted>(new
            {
                __Header_Transport = "delivery",
                Product = "Burgers"
            });

As far as I understand the setup above is correct, however when starting the receiving application an error is thrown: ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity

I have a clean RabbitMQ instance without any previously existing queues and even when RabbitMQ is not started at all I also get the error.

Is there something wrong with my configuration? Or might this be a bug in MassTransit?


Solution

  • After a day of investigation I found that the error did not show when I added re.ConfigureConsumeTopology = false;

    However, the message was not delivered to the queue. To fix this I changed the SetBindingArgument configuration.

    cfg.ReceiveEndpoint("OrderPickup", re =>
            {
                re.ConfigureConsumeTopology = false;
                re.ConfigureConsumer<OrderPickupConsumer>(context);
                re.Bind<OrderSubmitted>(x =>
                {
                    x.ExchangeType = "headers";
                    x.SetBindingArgument("Transport", "pickup");
                    x.SetBindingArgument("x-match", "all");
                });
            });
    
            cfg.ReceiveEndpoint("OrderDelivery", re =>
            {
                re.ConfigureConsumeTopology = false;
                re.ConfigureConsumer<OrderDeliveryConsumer>(context);
                re.Bind<OrderSubmitted>(x =>
                {
                    x.ExchangeType = "headers";
                    x.SetBindingArgument("Transport", "delivery");
                    x.SetBindingArgument("x-match", "all");
                });
            });
    

    Now the messages are sent to the correct queue, based on the header filter and pickup by the consumer.