Search code examples
c#asp.net-coremasstransitautomatonymous

MassTransit Automatonymous - State not changing when a message is Sent


I am trying to figure why "Sending" a message does not invoke state machine, but if I "Publish" a message, it works and I can see the state changing.

Following is my code, it is similar to the documentation, except that I am trying to "Send" a message.

Components

State Machine:
public class OrderState: SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public int CurrentState { get; set; }
    public DateTime? OrderDate { get; set; }
}

public class OrderStateMachine: MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
    public State Completed { get; private set; }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
    public Event<OrderAccepted> OrderAccepted { get; private set; }
    public Event<OrderCompleted> OrderCompleted { get; private set; }

    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState, Submitted, Accepted, Completed);
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));

        Initially(
            When(SubmitOrder)
                .Then(context => context.Instance.OrderDate = context.Data.OrderDate)
                .TransitionTo(Submitted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));
        
        During(Accepted,
            Ignore(SubmitOrder));

        DuringAny(
            When(OrderCompleted)
                .TransitionTo(Completed));
        
        SetCompleted(async instance =>
        {
            var currentState = await this.GetState(instance);
            return Completed.Equals(currentState);
        });
    }
}
Contracts:
public record SubmitOrder(Guid OrderId, DateTime? OrderDate);
public record OrderAccepted(Guid OrderId);
public record OrderCompleted(Guid OrderId);
Consumers:
public class SubmitOrderConsumer: IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await Task.Delay(2000);
    }
}

public class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>
{
    public SubmitOrderConsumerDefinition()
    {
        EndpointName = "submit-order";
    }
    
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
    {
        endpointConfigurator.ConfigureConsumeTopology = false;
    }
}

Web API

Program.cs (snippet)
// Add services to the container.
builder.Services.AddMassTransit(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();
    cfg.UsingRabbitMq((context, configurator) =>
    {
        configurator.Host("localhost", "/", hostConfigurator =>
        {
            hostConfigurator.Username("guest");
            hostConfigurator.Password("guest");
        });
    });
});
builder.Services.AddMassTransitHostedService();
builder.Services.AddControllers();

OrderController
[Route("order")]
public class OrderController : ControllerBase
{
    private readonly ISendEndpointProvider _sendEndpointProvider;
    public OrderController(ISendEndpointProvider sendEndpointProvider)
    {
        _sendEndpointProvider = sendEndpointProvider;
    }
    
    [HttpPost]
    public async Task<IActionResult> SendOrder()
    {
        var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("exchange:submit-order"));
        await endpoint.Send(new SubmitOrder(Guid.NewGuid(), DateTime.Now));
        return Ok();
    }
}

Worker Service

Program.cs
using IHost = Microsoft.Extensions.Hosting.IHost;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddMassTransit(cfg =>
        {
            cfg.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
            cfg.AddSagaStateMachine<OrderStateMachine, OrderState>().InMemoryRepository();
            cfg.UsingRabbitMq((context, rabbitMqConfigurator) =>
            {
                rabbitMqConfigurator.Host("localhost", "/", hostConfigurator =>
                {
                    hostConfigurator.Username("guest");
                    hostConfigurator.Password("guest");
                });
                rabbitMqConfigurator.ReceiveEndpoint("saga-order", endpointConfigurator =>
                {
                    endpointConfigurator.ConfigureSaga<OrderState>(context);
                });
                rabbitMqConfigurator.ConfigureEndpoints(context);
            });
        });
        services.AddMassTransitHostedService();
        services.AddHostedService<Worker>();
    })
    .Build();

await host.RunAsync();

Then I do a POST via Postman to: http://localhost:5000/order

It does call the SubmitOrderConsumer, but for some reason, the State machine does not get invoked (it won't hit breakpoint inside the Then handler that sets the Order Date inside Initially state.). I think I am missing something that connects the two together. Any feedback is greatly appreciated. Thank you.


Solution

  • In your example, you'd want to use Publish, especially in this scenario where you have two consumers (the consumer, and the state machine) on separate endpoints (queue) that would be consuming the message. Sending directly to the exchange would only get the message to one of the endpoints.