Search code examples
amazon-sqsmasstransitaws-sqs-fifo

MassTransit: Batch Consumer of an Amazon SQS FIFO queue


I'm trying to configure a batch consumer for an Amazon SQS FIFO queue with no success, the consumer is always triggered on a single message-basis. I guess this could be a limitation of FIFO queues, but it would be nice to get feedback from others.

This is the code I'm using:

// Message:
public record BatchMessage();

// Consumer:
public class BatchMessageConsumer : IConsumer<Batch<BatchMessage>>
{
    public Task Consume(ConsumeContext<Batch<BatchMessage>> context)
    {
        Debug.WriteLine($"Messages={context.Message.Length}");

        return Task.CompletedTask;
    }
}

// ConsumerDefinition:
public class BatchMessageConsumerDefinition : ConsumerDefinition<BatchMessageConsumer>
{
    public BatchMessageConsumerDefinition()
    {
        // SQS Queue name.
        Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
    }
}

// Configuration:
services.AddMassTransit(x =>
{
    x.AddConsumers(typeof(Program).Assembly);

    x.UsingAmazonSqs((context, sqs) =>
    {
        sqs.Host("region", (_) => { });

        // SNS Topic name.
        sqs.Message<BatchMessage>(x => x.SetEntityName("BatchMessage.fifo"));

        sqs.ConfigureEndpoints(context);
    });
});

// Trigger:
var groupId = Guid.NewGuid().ToString();

for (var i = 1; i <= 10; i++)
{
    await _publishEndpoint.Publish(new BatchMessage(), (context) =>
    {
        // Required for FIFO messages.
        context.TrySetGroupId(groupId);
        context.TrySetDeduplicationId(context.MessageId.ToString());
    });
}

Solution

  • You should update your consumer definition to configure the transport for concurrency delivery of the same MessageGroupId:

    // ConsumerDefinition:
    public class BatchMessageConsumerDefinition : 
        ConsumerDefinition<BatchMessageConsumer>
    {
        public BatchMessageConsumerDefinition()
        {
            // SQS Queue name.
            Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
        }
    
        protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
            IConsumerConfigurator<BatchMessageConsumer> consumerConfigurator, IRegistrationContext context)
        {
            consumerConfigurator.Options<BatchOptions>(o => o.SetMessageLimit(10).SetTimeLimit(1000));
    
            if(endpointConfigurator is IAmazonSqsReceiveEndpointConfigurator cfg)
            {
                cfg.ConcurrentDeliveryLimit = 5;
            }
        }
    }