Search code examples
masstransitautomatonymous

Restart a MassTransit Saga


In my application, user can trigger updates for a specific resource. When that happens, an UpdateInitiated event is sent.

A MassTransit Saga is initiated based on that event, and it triggers a series of other events until it considers itself done and finalizes.

While an update is ongoing, users can make changes to the resource and request a new update. I'm trying to figure out how I can abort the ongoing Saga and create a new instance, when a new UpdateInitiated event is sent for the same resource.

Two approaches I'm thinking about

  1. Correlate on ResourceId. So the initial Saga would get the second UpdateInitiated event as well. But then I don't know how to reset the saga (and stop listening for events related to the previous update).

  2. Use CorrelatedBy<Guid> (as I am doing now). This would create a new Saga for the new update event. But then I don't know how to cancel the initial Saga.

Is there a way to listen to arbitrary events in a Saga? Like: "Listen to all UpdatedInitiated events even if they don't correlate, and cancel the current Saga if there is an event with the matching ResourceId".


Solution

  • I realized that it's possible to register for events where you correlate on something other than the CorrelationId. In my use case, I want to abort an ongoing saga if there is a new event kicking off a new saga, for the same ResourceId.

    What I did was this:

    // Listen to update initiated events for the same ResourceId
    Event(() => UpdateInitiated , e => e
        .CorrelateBy((instance, context) =>
            instance.CurrentState != 2 &&
            instance.CorrelationId != context.Message.CorrelationId &&
            instance.ResourceId == context.Message.ResourceId
        )
        .SelectId(x => NewId.NextGuid()));
    
    // Finalize this saga if there is a newer one for the same ResourceId
    DuringAny(
        When(UpdateInitiated)
            .Then(context => Log.Information("Saga aborted due to newer update initiated by {correlationId}", context.Data.CorrelationId))
            .Finalize());