Search code examples
rabbitmqmasstransit

How to configure multiple retry policies in MassTransit


I want two separate retry policies for a specific consumer. One for HttpRequestException and SocketException, and the other for a custom DatabaseException and unhandled SqlException. I want to do that because I want to have separate exponential retry intervals for both.

I have the following configuration:

cfg.ReceiveEndpoint(
    host,
    "ExampleQueueName",
    ec =>
    {
        ec.Consumer<ExampleConsumer>(context);
        ec.EnablePriority(5);
        ec.UseRetry(retryConfig =>
        {
            // exponential backup in minutes
            retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
            retryConfig.Handle<HttpRequestException>(x => x.IsTransient());
            retryConfig.Handle<SocketException>(x => x.IsTransient());
        });
        ec.UseRetry(retryConfig =>
        {
            // exponential backup in seconds
            retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
            retryConfig.Handle<DatabaseException>(x => x.IsTransient());
            retryConfig.Handle<SqlException>(x => x.IsTransient());
        });
    });

Presently, only the second one is used. The first one appears to be overwritten.

I've also tried configuring second level retries like so:

cfg.ReceiveEndpoint(
    host,
    "QueueName",
    ec =>
    {
        ec.Consumer<ExampleConsumer>(context, factory =>
        {
            factory.UseRetry(retryConfig =>
            {
                // exponential backup in seconds for sql and concurrency exceptions
                retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
                retryConfig.Handle<DatabaseException>(x => x.IsTransient());
                retryConfig.Handle<SqlException>(x => x.IsTransient());
            });
        });
        ec.EnablePriority(5);
        ec.UseRetry(retryConfig =>
        {
            // exponential backup in minutes for http request exceptions
            retryConfig.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
            retryConfig.Handle<DatabaseException>(x => x.IsTransient());
            retryConfig.Handle<SqlException>(x => x.IsTransient());
        });
    });

But that doesn't seem to work either. Does any know how I can apply different retry intervals for different exception types?


Solution

  • MassTransit builds everything as a pipeline, and the order of the filters matters. Rewriting your example above should fix the issue (all I did was move the Consumer to the end).

    cfg.ReceiveEndpoint("ExampleQueueName", ec =>
    {
        ec.EnablePriority(5);
        ec.UseMessageRetry(r =>
        {
            r.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
            r.Handle<HttpRequestException>(x => x.IsTransient());
            r.Handle<SocketException>(x => x.IsTransient());
        });
        ec.UseMessageRetry(r =>
        {
            r.Intervals(new[] { 1, 2, 4, 8, 16, 32 }.Select(t => TimeSpan.FromSeconds(t)).ToArray());
            r.Handle<DatabaseException>(x => x.IsTransient());
            r.Handle<SqlException>(x => x.IsTransient());
        });
        ec.ConfigureConsumer<ExampleConsumer>(context);
    });