Search code examples
c#entity-framework-corerabbitmqmasstransitautomatonymous

How to configure EF Core persistence in MassTransit and Automatonymous?


I am trying to configure Automatonymous worker implementation with EF Core as persistence. I publish event via api and process it in hosted service using RabbitMq as a transport. Unfortunately database does not store state of machine. I applied migrations and I see table OrderState, but there is no data in it after I publish OrderSubmmited event. It gets processed properly, because I see a log in a console, but database table remains empty. What am I missing?

This is my code:

Event:

public interface OrderSubmitted : CorrelatedBy<Guid>
    {
    }

Consumer:

public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
    {
        public Task Consume(ConsumeContext<OrderSubmitted> context)
        {
            Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");
            
            return Task.CompletedTask;
        }
    }

My saga instance:

public class OrderState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }
        public string CurrentState { get; set; }
    }

State machine:

public class OrderStateMachine : MassTransitStateMachine<OrderState>
    {
        public State Submitted { get; private set; }

        public Event<OrderSubmitted> OrderSubmitted { get; set; }

        public OrderStateMachine()
        {
            Event(() => OrderSubmitted);
            
            InstanceState(x => x.CurrentState);
            
            Initially(
                When(OrderSubmitted)
                    .TransitionTo(Submitted));
            
            DuringAny(
                When(OrderSubmitted)
                    .TransitionTo(Submitted));
        }
    }
public class OrderStateMachineDefinition : SagaDefinition<OrderState>
    {
        public OrderStateMachineDefinition()
        {
            ConcurrentMessageLimit = 15;
        }
    }

DbContext:

public class OrderStateDbContext : SagaDbContext
    {
        public OrderStateDbContext(DbContextOptions options) : base(options)
        {
        }

        protected override IEnumerable<ISagaClassMap> Configurations
        {
            get
            {
                yield return new OrderStateMap();
            }
        }
    }
public class OrderStateMap : SagaClassMap<OrderState>
    {
        protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
        {
            entity.Property(x => x.CurrentState).HasMaxLength(64);
        }
    }

Program class:

public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddDbContext<OrderStateDbContext>(builder =>
                         builder.UseSqlServer("Server=(localdb)\\mssqllocaldb;Database=OrderState;Trusted_Connection=True;", m =>
                         {
                             m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                             m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
                         }));
                    
                    services.AddMassTransit(config =>
                    {
                        config.AddSagaRepository<OrderState>()
                            .EntityFrameworkRepository(r =>
                            {
                                r.ExistingDbContext<OrderStateDbContext>();
                                r.LockStatementProvider = new SqlServerLockStatementProvider();
                            });
                        
                        config.AddConsumer<OrderSubmittedConsumer>();

                        config.UsingRabbitMq((ctx, cfg) => {
                            cfg.Host("amqp://guest:guest@localhost:5672");
                            cfg.ReceiveEndpoint("order-queue", c => {
                                c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
                                // I'm assuming this is the place where something like c.StateMachineSaga() is missing, but I don't know how should this look like with EF Core
                            });
                        });
                    });
                    
                    services.AddHostedService<Worker>();
                });
    }

Worker class:

public class Worker : IHostedService
    {
        private readonly IBusControl _bus;

        public Worker(IBusControl bus)
        {
            _bus = bus;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return _bus.StopAsync(cancellationToken);
        }
    }

Solution

  • In your code example, you aren't adding the saga, or configuring the saga on a receive endpoint. The consumer is a separate thing, and completely unrelated to the saga. You should be calling:

    AddSagaStateMachine<OrderStateMachine, OrderState, OrderStateMachineDefinition>()

    And then either using ConfigureSaga<OrderState> or switching to ConfigureEndpoints for the endpoint to be configured automatically.