Search code examples
middlewaremasstransitsagaautomatonymous

MassTransit Middleware: OnMissingInstance equivalent for Saga without Automatonymous


I'm attempting to hand crank a Saga without using an Automatonymous State Machine (which I started using but found it difficult to correctly unit test) by following a similar pattern to the way that Sagas are implemented in NServiceBus.

However I'm having an issue where a message is hitting the saga before an instance has been created by the initial message, and this causes MassTransit to silently swallow the message without throwing an exception or moving the message to the error queue.

While trying to find out to solve this issue a lot of people are suggesting the use of OnMissingInstance to fault the message and then rely on a retry framework to effectively delay it until the saga has been correctly initialized. See Here.

I was wondering if there was a way to do this without using the Automatonymous framework, most likely by leveraging some middleware to preemptively check to ensure that a Saga exists for a message (throwing an exception and retrying later if not) before it tries to process it? And if not does this sound like something that would be useful/is possible?

Further information:

-Using Azure Service Bus
-MongoDB for Saga Persistence.

Some code simplified code snippets of what I'm going for:

ec.Saga(new MongoDbSagaRepository<CodeProviderSaga>(sagaDatabase, new MongoDbSagaConsumeContextFactory(), nameof(CodeProviderSaga)), config =>
{
    config.UseApplicationInsights(telemetryClient);
    config.UseRetry(retryConfig => retryConfig.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(5), TimeSpan.FromSeconds(1)));
});
public class CodeProviderSaga :
    InitiatedBy<FirstEvent>,
    Orchestrates<SecondEvent>,
    IVersionedSaga
{
    [BsonId]
    public Guid CorrelationId { get; set; }
    public int Version { get; set; }
    public bool SecondEventRecieved { get; set; }
    public DateTimeOffset? LastUpdated { get; set; }

    public Task Consume(ConsumeContext<FirstEvent> context)
    {
        this.LastUpdated = DateTimeOffset.UtcNow;
        return Task.Completed;
    }

    // An exception should be thrown if this is received first so it can be retried but it is silently dropped atm
    public Task Consume(ConsumeContext<SecondEvent> context)
    {
        this.SecondEventRecieved = true;
        this.LastUpdated = DateTimeOffset.UtcNow;

        return Task.Completed;
    }
}

Solution

  • To close this out, a commit was just added to the develop branch which throws an exception for messages without a matching saga that are specified in the Orchestrates<T> interface.