Search code examples
c#sql-servermasstransitworker

Masstransit transactinal outbox does not use the outbox in the consumer context


In the following scenario, an API publishes a message to a queue using the SqlServer transactional outbox.

Another component consumes that message and uses the SqlServer transactional outbox to trigger another message to a third component.

The process in the API is carried out correctly and makes use of the outbox, saving the entity and the message as a transaction, also, can see that the PublishEndpointProvider has the Outbox middleware.

Outbox in API

The problem appears when I try to do the same outbox configuration inside a Masstransit consumer. (Share the consumer code)

public class ConsumeOrderPublished : IConsumer<OrderPublished>
{
    private readonly IPublishEndpoint _publishEndpoint;
    private readonly IPaymentRepository _paymentRepository;
    private readonly IUnitOfWork _unitOfWork;

    public ConsumeOrderPublished(IPublishEndpoint publishEndpoint,
        IPaymentRepository paymentRepository, IUnitOfWork unitOfWork)
    {
        _publishEndpoint = publishEndpoint;
        _paymentRepository = paymentRepository;
        _unitOfWork = unitOfWork;
    }

    public async Task Consume(ConsumeContext<OrderPublished> context)
    {
        var orderPublished = context.Message;
        var payment = new Payment
        {
            Id = Guid.NewGuid(),
            Amount = 20,
        };

        await _unitOfWork.BeginTransactionAsync(context.CancellationToken);
        await _paymentRepository.AddAsync(payment, context.CancellationToken);
        await _publishEndpoint.Publish<OrderProcessed>(new
        {
            OrderId = orderPublished.Id,
            PaymentId = payment.Id,
        });

        await _unitOfWork.CommitTransactionAsync();
    }
}

Neither the context nor the injected IPublishEndpoint have the outbox middleware configuration:

Consumer context in worker

Consumer IPublishEndpoint in worker

The behavior is different when I remove the SqlServer outbox and use the in-memory outbox, The in-memory outbox configures correctly the middleware:

Consumer with in memory outbox

This is the Masstransit configuration in the worker component, also have the source code in this repo. I am using Masstransit v8.1.1

public static class MasstransitWorkerExtensions
{
    public static IServiceCollection AddMasstransitWorkerConfiguration(this IServiceCollection services,
        IConfiguration configuration)
    {
        var busOptions = configuration.GetSection(BrokerOptions.SectionName).Get<BrokerOptions>();

        services.AddMassTransit(x => {
            x.AddEntityFrameworkOutbox<OrderDbContext>(o =>
            {
                o.QueryDelay = TimeSpan.FromSeconds(1);
                o.UseSqlServer();
                o.UseBusOutbox(c => c.DisableDeliveryService());
            });

            x.SetKebabCaseEndpointNameFormatter();

            x.AddConsumer<ConsumeOrderPublished>();
            x.AddConsumer<ConsumeOrderProcessed>();

            x.UsingRabbitMq((ctx, cfg) =>
            {
                cfg.Host(busOptions.ConnectionString);
                cfg.ConfigureEndpoints(ctx);
                // cfg.UseInMemoryOutbox(); 
                // when activate this line and remove lines 16 - 21 the outbox works fine
            });
        });

        return services;
    }
}

I want to use the transactional outbox pattern inside a Masstransit consumer. I found this post that said this possible issue is solved.


Solution

  • You didn't configure it on any of your consumers, so it isn't going to use it. The transactional outbox must be configured for it to work.

    You can configure the outbox on all receive endpoints by adding a callback:

    x.AddConfigureEndpointsCallback((context, name, cfg) => 
    {
        cfg.UseMessageRetry(r => r.Intervals(100, 500, 1000, 5000, 10000));
        cfg.UseEntityFrameworkOutbox<OrderDbContext>(context);
    });
    

    Also, be aware that MassTransit owns and manages the transaction for the OrderDbContext when using it this way. As such, your "unit of work" should not be used.

    public async Task Consume(ConsumeContext<OrderPublished> context)
    {
        var orderPublished = context.Message;
        var payment = new Payment
        {
            Id = Guid.NewGuid(),
            Amount = 20,
        };
    
        await _paymentRepository.AddAsync(payment, context.CancellationToken);
        await context.Publish<OrderProcessed>(new
        {
            OrderId = orderPublished.Id,
            PaymentId = payment.Id,
        });
    }
    

    MassTransit will automatically start and commit the transaction for you.

    You also don't need to inject IPublishEndpoint, as the ConsumeContext passed to the Consume method already includes it.