Search code examples
c#state-machinemasstransitsaga

Masstransit UseInMemoryOutbox not working with MultiBus Saga?


I implemented a custom activity for supporting multibus saga. But I found an issue that is I received a published message from SecondBus before the state saga saves to Redis. I think UseInMemoryOutbox() should handle this or i wrong.

public class TestActivity : Activity<SagaState, IFirstBusRequest>
{
    private readonly ISecondBus _secondBus;

    public TestActivity(ISecondBus secondBus)
    {
        _secondBus = secondBus;
    }


    public async Task Execute(BehaviorContext<SagaState, IFirstBusRequest> context, Behavior<SagaState, IFirstBusRequest> next)
    {
        var endpoint = await _secondBus.GetSendEndpoint(new Uri($"queue:second-bus-request"));
        await endpoint.Send(new { }); // send immediately
    }      
}

Solution

  • Correct, it's a known limitation since the other bus instance is a completely separate bus, and isn't part of the outbox on the receiving bus.

    If you need to involve the outbox, consider producing the message on the first bus, and have a consumer on that bus which only forwards that message to the other bus.

    public class ForwardMessageConsumer<T> :
        IConsumer<T>
        where T : class
    {
        readonly ISecondBus _bus;
    
        public ForwardMessageConsumer(ISecondBus bus)
        {
            _bus = bus;
        }
    
        public async Task Consume(ConsumeContext<T> context)
        {
            var messagePipe = new ForwardMessagePipe<T>(context);
    
            await _bus.Publish(context.Message, messagePipe, context.CancellationToken);
        }
    }
    

    You could then just have an endpoint with those message forwarders on it:

    x.AddConsumer<ForwardMessageConsumer<A>>()
        .Endpoint(e => e.Name = "second-bus-forwarder");
    x.AddConsumer<ForwardMessageConsumer<B>>()
        .Endpoint(e => e.Name = "second-bus-forwarder");
    

    And then, ConfigureEndpoints will put those consumers on a receive endpoint that will forward those messages to the other bus instance.

    The endpoint configuration is to put multiple consumers on the same queue, to avoid having multiple queues that just forward messages. It's assumed that Publish would be used from the activity.