I am trying to configure Automatonymous worker implementation with EF Core as persistence. I publish event via api and process it in hosted service using RabbitMq as a transport. Unfortunately database does not store state of machine. I applied migrations and I see table OrderState
, but there is no data in it after I publish OrderSubmmited
event. It gets processed properly, because I see a log in a console, but database table remains empty. What am I missing?
This is my code:
Event:
public interface OrderSubmitted : CorrelatedBy<Guid>
{
}
Consumer:
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
public Task Consume(ConsumeContext<OrderSubmitted> context)
{
Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");
return Task.CompletedTask;
}
}
My saga instance:
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
State machine:
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public Event<OrderSubmitted> OrderSubmitted { get; set; }
public OrderStateMachine()
{
Event(() => OrderSubmitted);
InstanceState(x => x.CurrentState);
Initially(
When(OrderSubmitted)
.TransitionTo(Submitted));
DuringAny(
When(OrderSubmitted)
.TransitionTo(Submitted));
}
}
public class OrderStateMachineDefinition : SagaDefinition<OrderState>
{
public OrderStateMachineDefinition()
{
ConcurrentMessageLimit = 15;
}
}
DbContext:
public class OrderStateDbContext : SagaDbContext
{
public OrderStateDbContext(DbContextOptions options) : base(options)
{
}
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new OrderStateMap();
}
}
}
public class OrderStateMap : SagaClassMap<OrderState>
{
protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(64);
}
}
Program class:
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<OrderStateDbContext>(builder =>
builder.UseSqlServer("Server=(localdb)\\mssqllocaldb;Database=OrderState;Trusted_Connection=True;", m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
}));
services.AddMassTransit(config =>
{
config.AddSagaRepository<OrderState>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<OrderStateDbContext>();
r.LockStatementProvider = new SqlServerLockStatementProvider();
});
config.AddConsumer<OrderSubmittedConsumer>();
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.ReceiveEndpoint("order-queue", c => {
c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
// I'm assuming this is the place where something like c.StateMachineSaga() is missing, but I don't know how should this look like with EF Core
});
});
});
services.AddHostedService<Worker>();
});
}
Worker class:
public class Worker : IHostedService
{
private readonly IBusControl _bus;
public Worker(IBusControl bus)
{
_bus = bus;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _bus.StopAsync(cancellationToken);
}
}
In your code example, you aren't adding the saga, or configuring the saga on a receive endpoint. The consumer is a separate thing, and completely unrelated to the saga. You should be calling:
AddSagaStateMachine<OrderStateMachine, OrderState, OrderStateMachineDefinition>()
And then either using ConfigureSaga<OrderState>
or switching to ConfigureEndpoints
for the endpoint to be configured automatically.