I'm trying to configure a batch consumer for an Amazon SQS FIFO queue with no success, the consumer is always triggered on a single message-basis. I guess this could be a limitation of FIFO queues, but it would be nice to get feedback from others.
This is the code I'm using:
// Message:
public record BatchMessage();
// Consumer:
public class BatchMessageConsumer : IConsumer<Batch<BatchMessage>>
{
public Task Consume(ConsumeContext<Batch<BatchMessage>> context)
{
Debug.WriteLine($"Messages={context.Message.Length}");
return Task.CompletedTask;
}
}
// ConsumerDefinition:
public class BatchMessageConsumerDefinition : ConsumerDefinition<BatchMessageConsumer>
{
public BatchMessageConsumerDefinition()
{
// SQS Queue name.
Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
}
}
// Configuration:
services.AddMassTransit(x =>
{
x.AddConsumers(typeof(Program).Assembly);
x.UsingAmazonSqs((context, sqs) =>
{
sqs.Host("region", (_) => { });
// SNS Topic name.
sqs.Message<BatchMessage>(x => x.SetEntityName("BatchMessage.fifo"));
sqs.ConfigureEndpoints(context);
});
});
// Trigger:
var groupId = Guid.NewGuid().ToString();
for (var i = 1; i <= 10; i++)
{
await _publishEndpoint.Publish(new BatchMessage(), (context) =>
{
// Required for FIFO messages.
context.TrySetGroupId(groupId);
context.TrySetDeduplicationId(context.MessageId.ToString());
});
}
You should update your consumer definition to configure the transport for concurrency delivery of the same MessageGroupId
:
// ConsumerDefinition:
public class BatchMessageConsumerDefinition :
ConsumerDefinition<BatchMessageConsumer>
{
public BatchMessageConsumerDefinition()
{
// SQS Queue name.
Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<BatchMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Options<BatchOptions>(o => o.SetMessageLimit(10).SetTimeLimit(1000));
if(endpointConfigurator is IAmazonSqsReceiveEndpointConfigurator cfg)
{
cfg.ConcurrentDeliveryLimit = 5;
}
}
}