Search code examples
c#.netmasstransitdistributed-systemrequest-response

Multiple consumers for Request/Response in MassTransit


Reading documentation of MassTransit I've found the following statement: "Multiple consumers connected to the same receive endpoint are fine, requests will be load balanced across the connected consumers." However, this doesn't work for me. My configuration looks as follows:

builder.Services.AddMassTransit(configure =>
{
    configure.SetKebabCaseEndpointNameFormatter();
    configure.AddConsumer<ObjectCreatedEventHandlerFirst>()
        .Endpoint(x => { x.Name = "custom-endpoint"; });
    configure.AddConsumer<ObjectCreatedEventHandlerSecond>()
        .Endpoint(x => { x.Name = "custom-endpoint"; });
    configure.AddRequestClient<ObjectCreatedEvent>(new Uri("exchange:custom-endpoint"));
    configure.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(rabbitMqSettings.Host, h =>
        {
            h.Username(rabbitMqSettings.User);
            h.Password(rabbitMqSettings.Password);
        });
        cfg.ConfigureEndpoints(context);
    });
});

Request initiator:

    app.MapGet("/",
    async (IRequestClient<ObjectCreatedEvent> requestClient) =>
    {
        var response =
            await requestClient.GetResponse<ObjectDeletedEvent>(
                new(Guid.NewGuid(), "123"));
        return response.Message.Name;
    });

Consumers:

    public class ObjectCreatedEventHandlerFirst
        : IConsumer<ObjectCreatedEvent>
    {

        public async Task Consume(ConsumeContext<ObjectCreatedEvent> context)
        {
            await context.RespondAsync<ObjectDeletedEvent>(
                new(Guid.NewGuid(), "1"));
        }
    }

    public class ObjectCreatedEventHandlerSecond
        : IConsumer<ObjectCreatedEvent>
    {
        public async Task Consume(ConsumeContext<ObjectCreatedEvent> context)
        {
            await context.RespondAsync<ObjectDeletedEvent>(
                new(Guid.NewGuid(), "2"));
        }
    }

I thought that both consumers will compete for a message and , eventually, only one will be executed. But in reality both of them are executed concurrently.

Hence, I have a question. Do I have any mistakes in configuration or my entire assumtion about behaviour is wrong ?


Solution

  • Your assumption is wrong, or misunderstood. Multiple services instances with the same consumer on the same receive endpoint (queue) will be loaded balanced.

    If you configure multiple MassTransit consumers of the same message type on a single queue, all of those consumers will consume each message concurrently. If you configure multiple MassTransit consumers of different message types on a single queue, the message is only consumed by the consumers of the specific message type.

    The behavior I think you're referencing above is scaling-out, which is different and refers to service instances.