I'm new to Masstransit and I have taken up a small test case to create pub /Sub with AWS SQS and in case of any exception consume the fault and perform the recoverable action .
Registration and Receive End point configuration
public void ConfigureServices(IServiceCollection services) { services.AddControllers();
services.AddMassTransit(config => {
config.AddConsumer<OrderConsumer>();
config.AddConsumer<OrderFaultConsumer>();
//config.AddConsumer<OrderAccepted>();
config.UsingAmazonSqs((ctx, cfg) => {
cfg.Host("ap-southeast-2",
h =>
{
h.AccessKey("AccessKey");
h.SecretKey("SecretKey");
});
cfg.ReceiveEndpoint("order-queue", c => {
//Don't create a new topic
c.ConfigureConsumeTopology = false;
c.Subscribe("Order", s =>
{
});
c.UseMessageRetry(r=>r.Immediate(3));
c.ConfigureConsumer<OrderConsumer>(ctx);
c.ConfigureConsumer<OrderFaultConsumer>(ctx);
});
services.AddMassTransitHostedService();
Sample Publishing code :
public async Task<IActionResult> Post([FromBody] OrderDto order)
{
await publishEndpoint.Publish<IOrder>(new Order()
{
Message="Fault",
order=order
});
return Ok();
}
OrderConsumer
public class OrderConsumer : IConsumer<IOrder>
{
private readonly ILogger<OrderConsumer> logger;
public OrderConsumer(ILogger<OrderConsumer> logger)
{
this.logger = logger;
}
/*
MassTransit delivers messages to consumers by calling the Consume method */
public async Task Consume(ConsumeContext<IOrder> context)
{
if(context.Message.Message=="Fault")
throw new Exception("Very bad things happened");
await Console.Out.WriteLineAsync(context.Message.Message);
}
}
OrderFaultConsumer
public class OrderFaultConsumer : IConsumer<Fault<IOrder>>
{
private readonly ILogger<OrderFaultConsumer> _logger;
public OrderFaultConsumer(ILogger<OrderFaultConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<Fault<IOrder>> context)
{
var originalMessage = context.Message.Message.order;
var exceptions = context.Message.Exceptions;
//Do something interesting
await Console.Out.WriteLineAsync($"discarding message:{JsonConvert.SerializeObject(originalMessage)}");
}
}
Expectation is :
Actual :
Please advise.
There is a lot to unpack here, so stay with me.
Correct, when a message is published, a topic is created based upon the published message type. In this case, the name is generated based upon the IOrder
type. If you want to change the entity name for a topic, you can add the [EntityName("order")]
to the message type, or specify a custom entity name formatter in the bus topology.
When the consumer service is started, the order-queue
queue is created. So far, so good.
Yep, published the exact message type, and the consumer gets a copy of it. That's actually something of a conundrum, because:
With c.ConfigureConsumeTopology = false;
specified, starting the consumer service will neither create the topic nor bind the topic to the queue. So this must have been done either before this line was added, or manually.
Since Subscribe("Order", ...)
is used, and the name of Order specifically, the topic name would not match the published message type. Since SNS doesn't support polymorphic message routing, publishing IOrder
would not use the Order
topic, so either you're using short hand, or you've changed the entity name for that type to Order
.
When the consumer throws an exception, MassTransit will generate a Fault<IOrder>
which will be published to the matching topic. The generic type will always match the consumed message type (not the published message type).
This results in a topic being created (if it does not already exist).
As for your consumer, since you specified c.ConfigureConsumeTopology = false;
, the receive endpoint neither creates the topic when started, nor does it create a subscription to the topic. The result is that your OrderFaultConsumer will not receive fault messages that are published.
Configuring both the consumer, and a fault consumers, on the same receive endpoint is highly discouraged. I'd suggest using a separate queue for fault consumers.
The order-queue_error queue is created because MassTransit moves all faulted messages to the error queue to prevent message loss and so that the messages can be moved back to the queue after any issues have been resolved.