Search code examples
c#masstransitsaga

MassTransit saga running with prefetch > 1


I have a MassTransit saga state machine (derived from Automatonymous.MassTransitStateMachine) and I'm trying to work around an issue that only manifests when I set the endpoint configuration prefetchCount to a value greater than 1.

The issue is that the 'StartupCompletedEvent' is published and then immediately handled before the saga state is persisted to the database.

The state machine is configured as follows:

State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);

Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));


Initially(
    When(Requested)
        .ThenAsync(async ctx => 
        {
          Console.WriteLine("Starting up...");
          await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
          Console.WriteLine("Done starting up...");
        }
        .TransitionTo(StartingUp)
);


During(StartingUp,
    When(StartupCompleted)
        .ThenAsync(InitialiseSagaInstanceData)
        .TransitionTo(Initialising)
);

// snip...!

What happens when my saga receives the Requested event is:

  1. The ThenAsync handler of the Initially block gets hit. At this point, no saga data is persisted to the repo (as expected).
  2. StartupCompletedEvent is published to the bus. No saga data is persisted to the repo here either.
  3. The ThenAsync block of the Initially declaration completes. After this, the saga data is finally persisted.
  4. Nothing else happens.

At this point, there are no messages in the queue, and the StartupCompletedEvent is lost. However, there is a saga instance in the database.

I've played about with the start up and determined that one of the other threads (since my prefetch is > 1) has picked up the event, not found any saga with the correlationId in the database, and discarded the event. So the event is being published and handled before the saga has a chance to be persisted.

If I add the following to the Initially handler:

When(StartupCompleted)
    .Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))

Then I get the Console.WriteLine executing. My understanding of this is that the event has been received, but routed to the Initially handler since there is no saga that exists with the correlationId. If I put a break point in at this point and check the saga repo, there is no saga yet.

It's possibly worth mentioning a few other points:

  1. I have my saga repo context set to use IsolationLevel.Serializable
  2. I'm using EntityFrameworkSagaRepository
  3. Everything works as expected when the Prefetch count is set to 1
  4. I'm using Ninject for DI, and my SagaRepository is Thread scoped, so I imagine each handler that the prefetch count permits has its own copy of the saga repository
  5. If I publish the StartupCompletedEvent in a separate thread with a 1000ms sleep before it, then things work properly. I presume this is because the saga repo has completed persisting the saga state so when the event is eventually published and picked up by a handler, the saga state is retrieved from the repo correctly.

Please let me know if I've left anything out; I've tried to provide everything I think worthwhile without making this question too long...


Solution

  • I had this issue too and I would like to post Chris' comment as answer so people can find it.

    The solution is to enable the Outbox so messages are held until saga is persisted.

    c.ReceiveEndpoint("queue", e =>
    {
        e.UseInMemoryOutbox();
        // other endpoint configuration here
    }