Search code examples
state-machinemasstransitsagaautomatonymous

How to persist Saga instances using storage engines and avoid race condition


I tried persisting Saga Instances using RedisSagaRepository; I wanted to run Saga in load balancing setup, so I cannot use InMemorySagaRepository. However, after I switched, I noticed that some of the events published by Consumers were not getting processed by Saga. I checked the queue and did not see any messages.

What I noticed is it will likely occurs when the Consumer took little to no time to process command and publish event. This issue will not occur if I use InMemorySagaRepository or add Task.Delay() in Consumer.Consume()

Am I using it incorrectly?

Also, If I want to run Saga in load balancing setup, and if the Saga needs to send multiple commands of the same type using dictionary to track completeness (similar logic as in Handling transition to state for multiple events). When multiple Consumer publish events at the same time, would I have race condition if two Sagas are process two different events at the same time? In this case, would the Dictionary in State object will be set correctly?

The code is available here

SagaService.ConfigureSagaEndPoint() is where I switch between InMemorySagaRepository and RedisSagaRepository

private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
    var stateMachine = new MySagaStateMachine();

    try

    {
        var redisConnectionString = "192.168.99.100:6379";
        var redis = ConnectionMultiplexer.Connect(redisConnectionString);

        ///If we switch to RedisSagaRepository and Consumer publish its response too quick,
        ///It seems like the consumer published event reached Saga instance before the state is updated
        ///When it happened, Saga will not process the response event because it is not in the "Processing" state
        //var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
        var repository = new InMemorySagaRepository<SagaState>();

        endpointConfigurator.StateMachineSaga(stateMachine, repository);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

LeafConsumer.Consume is where we add the Task.Delay()

public class LeafConsumer : IConsumer<IConsumerRequest>
{
    public async Task Consume(ConsumeContext<IConsumerRequest> context)
    {
        ///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
        ///Otherwise, it seems that the Publish message from Consumer will not be processed
        ///If using InMemorySagaRepository, code will work without needing Task.Delay
        ///Maybe I am doing something wrong here with these projects
        ///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
        ///However, we cannot predict latency between Saga and Redis
        //await Task.Delay(1000);

        Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
        await context.Publish<IConsumerProcessed>(new
        {
            context.Message.CorrelationId,
        });
    }
}

Solution

  • When you have events published in this manner, and are using multiple service instances with a non-transactional saga repository (such as Redis), you need to design your saga such that a unique identifier is used and enforced by Redis. This prevents multiple instances of the same saga from being created.

    You also need to accept the events in more than the "expected" state. For instance, expecting to receive a Start, which puts the saga into a processing state, before receiving another event only in processing, is likely to fail. Allowing the saga to be started (Initially, in Automatonymous) by any of the sequence of events is recommended, to avoid out-of-order message delivery issues. As long as the events all move the dial from the left to the right, the eventual state will be reached. If an earlier event is received after a later event, it shouldn't move the state backwards (or to the left, in this example) but only add information to the saga instance and leave it at the later state.

    If two events are processed on separate service instances, they'll both try to insert the saga instance to Redis, which will fail as a duplicate. The message should then retry (add UseMessageRetry() to your receive endpoint), which would then pick up the now existing saga instance and apply the event.