I am currently testing out MassTransit exception handling, and am struggling to get Fault messages published. Ideally I'd like a Subscription consumer to retry and then if a fault is thrown, have another consumer pick this up, so far I have a simple setup:
public class TestConsumer : IConsumer<MyEvent>
{
public async Task Consume(ConsumeContext<MyEvent> context)
{
throw new Exception("Whoops");
}
}
public class TestConsumerDefinition : ConsumerDefinition<TestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TestConsumer> consumerConfigurator)
{
base.ConfigureConsumer(endpointConfigurator, consumerConfigurator);
consumerConfigurator.UseMessageRetry(configureRetry => configureRetry.Interval(5, TimeSpan.FromMilliseconds(500)));
}
public class TestFaultConsumer : IConsumer<Fault<MyEvent>>
{
public async Task Consume(ConsumeContext<Fault<MyEvent>> context)
{
return; // Handle something here
}
}
Registered as follows:
services.AddMassTransit(busRegistrationConfigurator =>
{
busRegistrationConfigurator.AddConsumer<TestConsumer, TestConsumerDefinition>();
busRegistrationConfigurator.AddConsumer<TestFaultConsumer>();
busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusFactoryConfigurator) =>
{
serviceBusFactoryConfigurator.Host(configuration["ServiceBusSettings:ConnectionString"]);
serviceBusFactoryConfigurator.SubscriptionEndpoint("test-consumer", "my-topic", cfg =>
{
cfg.ConfigureConsumer<TestConsumer>(busRegistrationContext);
});
});
});
When the exception is thrown in the first consumer, there is no fault queue created in ASB. Is there a piece of setup I am missing to configure the error queue in ASB and have the Fault be consumed?
Also as a final aside, I'd also like to get this Fault consumption working within an Azure function, is this possible or is this not supported with ASB/Functions?
Any help would be great, thanks
First, your consumer definition should be configuring retry on the endpoint, not the consumer:
public class TestConsumerDefinition : ConsumerDefinition<TestConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TestConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(configureRetry => configureRetry.Interval(5, TimeSpan.FromMilliseconds(500)));
}
}
Second, you aren't calling ConfigureEndpoints
, so there won't be an endpoint created for the fault consumer as it isn't configured. You should configure it after the subscription endpoint is configured:
busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusFactoryConfigurator) =>
{ serviceBusFactoryConfigurator.Host(configuration["ServiceBusSettings:ConnectionString"]);
serviceBusFactoryConfigurator.SubscriptionEndpoint("test-consumer", "my-topic", cfg =>
{
cfg.ConfigureConsumer<TestConsumer>(busRegistrationContext);
});
serviceBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);
});
As to the part about Azure Functions, that would require configuring everything in Azure Service Bus manually, since AF is the transport and requires everything to exist up front. There is a sample, that might give you some guidance.
I am not a fan of Azure Functions, as I find the deployment model painful, but it is supported by MassTransit.