I have a MassTransit saga state machine (derived from Automatonymous.MassTransitStateMachine) and I'm trying to work around an issue that only manifests when I set the endpoint configuration prefetchCount to a value greater than 1.
The issue is that the 'StartupCompletedEvent' is published and then immediately handled before the saga state is persisted to the database.
The state machine is configured as follows:
State(() => Initialising);
State(() => StartingUp);
State(() => GeneratingFiles);
Event(() => Requested, x => x.CorrelateById(ctx => ctx.Message.CorrelationId).SelectId(ctx => ctx.Message.CorrelationId));
Event(() => StartupCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => InitialisationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Event(() => FileGenerationCompleted, x => x.CorrelateById(ctx => ctx.Message.CorrelationId));
Initially(
When(Requested)
.ThenAsync(async ctx =>
{
Console.WriteLine("Starting up...");
await ctx.Publish(new StartupCompletedEvent() { CorrelationId = ctx.Instance.CorrelationId }));
Console.WriteLine("Done starting up...");
}
.TransitionTo(StartingUp)
);
During(StartingUp,
When(StartupCompleted)
.ThenAsync(InitialiseSagaInstanceData)
.TransitionTo(Initialising)
);
// snip...!
What happens when my saga receives the Requested event is:
At this point, there are no messages in the queue, and the StartupCompletedEvent is lost. However, there is a saga instance in the database.
I've played about with the start up and determined that one of the other threads (since my prefetch is > 1) has picked up the event, not found any saga with the correlationId in the database, and discarded the event. So the event is being published and handled before the saga has a chance to be persisted.
If I add the following to the Initially handler:
When(StartupCompleted)
.Then(ctx => Console.WriteLine("Got the startup completed event when there is no saga instance"))
Then I get the Console.WriteLine executing. My understanding of this is that the event has been received, but routed to the Initially handler since there is no saga that exists with the correlationId. If I put a break point in at this point and check the saga repo, there is no saga yet.
It's possibly worth mentioning a few other points:
Please let me know if I've left anything out; I've tried to provide everything I think worthwhile without making this question too long...
I had this issue too and I would like to post Chris' comment as answer so people can find it.
The solution is to enable the Outbox so messages are held until saga is persisted.
c.ReceiveEndpoint("queue", e =>
{
e.UseInMemoryOutbox();
// other endpoint configuration here
}