Search code examples
masstransit

How to register Redis as persistence layer for MassTransit a state machine that uses request


I am trying to use state machines by using Redis as the persistence layer. I understand that Redis saga repository does not support queries, but I need to have requests within my state machine. How do I register them?

This is my configuration

builder.Services.AddMassTransit(x =>
{
    x.SetRedisSagaRepositoryProvider(r =>
    {
        r.DatabaseConfiguration("localhost");
        r.ConcurrencyMode = ConcurrencyMode.Pessimistic;

        r.KeyPrefix = "mass-transit-dev";

        // Optional, to customize the lock key
        r.LockSuffix = "-lock";

        // Optional, the default is 30 seconds
        r.LockTimeout = TimeSpan.FromSeconds(90);
    });

    x.AddDelayedMessageScheduler();

    Assembly? entryAssembly = Assembly.GetEntryAssembly();

    x.AddConsumers(entryAssembly);
    x.AddSagaStateMachines(entryAssembly);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseDelayedMessageScheduler();

        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });
});

And this is my state machine.

    public BookReturnStateMachine()
    {
        Event(() => BookReturned, x => x.CorrelateById(m => m.Message.CorrelationId));

        Request(() => ChargeFine, x => x.FineRequestId, x =>
        {
            // x.(m => m.Message.CheckoutId);
            //x.Timeout = TimeSpan.FromSeconds(10);
        });

        InstanceState(x => x.CurrentState);

        Initially(
            When(BookReturned)
                .Then(context =>
                {
                    context.Saga.ReturnDate = context.Message.ReturnDate;
                    context.Saga.DueDate = context.Message.DueDate;
                    context.Saga.BookId = context.Message.BookId;
                    context.Saga.CheckoutId = context.Message.CheckoutId;
                })
                .IfElse(context => context.Saga.ReturnDate > context.Saga.DueDate,
                late => late.Request(ChargeFine, context => context.Init<ChargeFine>(new
                {
                    MemberId = Guid.NewGuid(),
                    Amount = 10
                }))
                .Then(ctx => Console.WriteLine("It is late, request sent and transitioning to ChargingFine state"))
                .TransitionTo(ChargingFine),
                onTime => onTime
                .Then(ctx => Console.WriteLine("It is on time, transitioning to final state"))
                .TransitionTo(Final))
        );

        During(ChargingFine,
            When(ChargeFine.Completed)
            .Then(ctx => Console.WriteLine("Member has been charged successfully, transitioning to final state"))
                .TransitionTo(Final),

            When(ChargeFine.Faulted)
                .TransitionTo(FailedToFindMember),

            When(ChargeFine.TimeoutExpired)
                .TransitionTo(FailedToFindMember)
        );
    }

    public Event<BookReturned> BookReturned { get; private set; }

    public State ChargingFine { get; private set; }

    public State FailedToFindMember { get; private set; }

    public Request<BookReturn, ChargeFine, FineCharged> ChargeFine { get; private set; }
}

public class BookReturn : SagaStateMachineInstance, ISagaVersion
{
    public Guid CheckoutId { get; set; }

    public Guid BookId { get; set; }

    public DateTime ReturnDate { get; set; }

    public DateTime DueDate { get; set; }

    public string CurrentState { get; set; }
    public Guid CorrelationId { get; set; }
    public Guid? FineRequestId { get; set; }
    public int Version { get; set; }
}

public record BookReturned
{
    public Guid CorrelationId { get; init; }

    public Guid CheckoutId { get; init; }

    public Guid BookId { get; init; }

    public DateTime ReturnDate { get; init; }

    public DateTime DueDate { get; init; }
}

public record ChargeFine
{
    public Guid CorrelationId { get; init; }

    public Guid MemberId { get; set; }

    public decimal Amount { get; set; }
}

public record FineCharged
{
    public Guid CorrelationId { get; init; }

    public Guid MemberId { get; set; }

    public decimal Amount { get; set; }
}

To me it seems like Request(() => ..) registration only accept expressions which is not supported by Redis. Does it mean that you cannot use Redis in this scenario or am I missing something?


Solution

  • Request(() => ChargeFine, x =>
    {
        x.Timeout = TimeSpan.Zero;
    });
    

    This will use the CorrelationId of the saga as the requestId.