I am trying to use state machines by using Redis
as the persistence layer. I understand that Redis saga repository does not support queries
, but I need to have requests within my state machine. How do I register them?
This is my configuration
builder.Services.AddMassTransit(x =>
{
x.SetRedisSagaRepositoryProvider(r =>
{
r.DatabaseConfiguration("localhost");
r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
r.KeyPrefix = "mass-transit-dev";
// Optional, to customize the lock key
r.LockSuffix = "-lock";
// Optional, the default is 30 seconds
r.LockTimeout = TimeSpan.FromSeconds(90);
});
x.AddDelayedMessageScheduler();
Assembly? entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.AddSagaStateMachines(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
And this is my state machine.
public BookReturnStateMachine()
{
Event(() => BookReturned, x => x.CorrelateById(m => m.Message.CorrelationId));
Request(() => ChargeFine, x => x.FineRequestId, x =>
{
// x.(m => m.Message.CheckoutId);
//x.Timeout = TimeSpan.FromSeconds(10);
});
InstanceState(x => x.CurrentState);
Initially(
When(BookReturned)
.Then(context =>
{
context.Saga.ReturnDate = context.Message.ReturnDate;
context.Saga.DueDate = context.Message.DueDate;
context.Saga.BookId = context.Message.BookId;
context.Saga.CheckoutId = context.Message.CheckoutId;
})
.IfElse(context => context.Saga.ReturnDate > context.Saga.DueDate,
late => late.Request(ChargeFine, context => context.Init<ChargeFine>(new
{
MemberId = Guid.NewGuid(),
Amount = 10
}))
.Then(ctx => Console.WriteLine("It is late, request sent and transitioning to ChargingFine state"))
.TransitionTo(ChargingFine),
onTime => onTime
.Then(ctx => Console.WriteLine("It is on time, transitioning to final state"))
.TransitionTo(Final))
);
During(ChargingFine,
When(ChargeFine.Completed)
.Then(ctx => Console.WriteLine("Member has been charged successfully, transitioning to final state"))
.TransitionTo(Final),
When(ChargeFine.Faulted)
.TransitionTo(FailedToFindMember),
When(ChargeFine.TimeoutExpired)
.TransitionTo(FailedToFindMember)
);
}
public Event<BookReturned> BookReturned { get; private set; }
public State ChargingFine { get; private set; }
public State FailedToFindMember { get; private set; }
public Request<BookReturn, ChargeFine, FineCharged> ChargeFine { get; private set; }
}
public class BookReturn : SagaStateMachineInstance, ISagaVersion
{
public Guid CheckoutId { get; set; }
public Guid BookId { get; set; }
public DateTime ReturnDate { get; set; }
public DateTime DueDate { get; set; }
public string CurrentState { get; set; }
public Guid CorrelationId { get; set; }
public Guid? FineRequestId { get; set; }
public int Version { get; set; }
}
public record BookReturned
{
public Guid CorrelationId { get; init; }
public Guid CheckoutId { get; init; }
public Guid BookId { get; init; }
public DateTime ReturnDate { get; init; }
public DateTime DueDate { get; init; }
}
public record ChargeFine
{
public Guid CorrelationId { get; init; }
public Guid MemberId { get; set; }
public decimal Amount { get; set; }
}
public record FineCharged
{
public Guid CorrelationId { get; init; }
public Guid MemberId { get; set; }
public decimal Amount { get; set; }
}
To me it seems like Request(() => ..)
registration only accept expressions which is not supported by Redis
. Does it mean that you cannot use Redis
in this scenario or am I missing something?
Request(() => ChargeFine, x =>
{
x.Timeout = TimeSpan.Zero;
});
This will use the CorrelationId
of the saga as the requestId.