Search code examples
azure-functionsazureservicebusmasstransitazure-servicebus-topics

MassTransit Azure Service Bus - Consuming faults using Topics/Subscriptions


I am currently testing out MassTransit exception handling, and am struggling to get Fault messages published. Ideally I'd like a Subscription consumer to retry and then if a fault is thrown, have another consumer pick this up, so far I have a simple setup:

  1. A test consumer with retry definition:
public class TestConsumer : IConsumer<MyEvent>
{
    public async Task Consume(ConsumeContext<MyEvent> context)
    {
        throw new Exception("Whoops");
    }
}

public class TestConsumerDefinition : ConsumerDefinition<TestConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TestConsumer> consumerConfigurator)
    {
        base.ConfigureConsumer(endpointConfigurator, consumerConfigurator);
        consumerConfigurator.UseMessageRetry(configureRetry => configureRetry.Interval(5, TimeSpan.FromMilliseconds(500)));
    }
  1. A fault consumer:
public class TestFaultConsumer : IConsumer<Fault<MyEvent>>
{
    public async Task Consume(ConsumeContext<Fault<MyEvent>> context)
    {
        return; // Handle something here
    }
}

Registered as follows:

services.AddMassTransit(busRegistrationConfigurator =>
        {
            busRegistrationConfigurator.AddConsumer<TestConsumer, TestConsumerDefinition>();
            busRegistrationConfigurator.AddConsumer<TestFaultConsumer>();

            busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusFactoryConfigurator) =>
            {
                serviceBusFactoryConfigurator.Host(configuration["ServiceBusSettings:ConnectionString"]);

                serviceBusFactoryConfigurator.SubscriptionEndpoint("test-consumer", "my-topic", cfg =>
                {
                    cfg.ConfigureConsumer<TestConsumer>(busRegistrationContext);
                });
            });
        });

When the exception is thrown in the first consumer, there is no fault queue created in ASB. Is there a piece of setup I am missing to configure the error queue in ASB and have the Fault be consumed?

Also as a final aside, I'd also like to get this Fault consumption working within an Azure function, is this possible or is this not supported with ASB/Functions?

Any help would be great, thanks


Solution

  • First, your consumer definition should be configuring retry on the endpoint, not the consumer:

    public class TestConsumerDefinition : ConsumerDefinition<TestConsumer>
    {
        protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TestConsumer> consumerConfigurator)
        {
            endpointConfigurator.UseMessageRetry(configureRetry => configureRetry.Interval(5, TimeSpan.FromMilliseconds(500)));
        }
    }
    

    Second, you aren't calling ConfigureEndpoints, so there won't be an endpoint created for the fault consumer as it isn't configured. You should configure it after the subscription endpoint is configured:

    busRegistrationConfigurator.UsingAzureServiceBus((busRegistrationContext, serviceBusFactoryConfigurator) =>
    {    serviceBusFactoryConfigurator.Host(configuration["ServiceBusSettings:ConnectionString"]);
    
        serviceBusFactoryConfigurator.SubscriptionEndpoint("test-consumer", "my-topic", cfg =>
        {
            cfg.ConfigureConsumer<TestConsumer>(busRegistrationContext);
        });
    
        serviceBusFactoryConfigurator.ConfigureEndpoints(busRegistrationContext);
    });
    

    As to the part about Azure Functions, that would require configuring everything in Azure Service Bus manually, since AF is the transport and requires everything to exist up front. There is a sample, that might give you some guidance.

    I am not a fan of Azure Functions, as I find the deployment model painful, but it is supported by MassTransit.