Search code examples
c#asp.net-coremasstransitautomatonymous

Using saga event to react to a a message published in a consumer


I'm putting together a proof of concept using Mass Transit with RabbitMq and Automatonymous in an asp.net core 2.1 application. I'm using EntityFramework core with Postgres for persistence.

What I'm trying to do is kick off a saga when a request is made to a http rest api and return the result once the saga completes. What I'm doing is:

  • hook up an event to start my saga using an interface that has a request/response client
  • in the saga publish a message that is consumed by a consumer
  • in the consumer publish a message that corresponds to another event in my saga
  • return a response from my saga on completion and finalize

This is my code:

my interfaces

public interface IStartSagaRequest
{
    Guid CorrelationId { get; set; }
    string Name {get; set;}
}

public interface IStartSagaResponse
{
    Guid CorrelationId { get; set; }
    bool DidComplete {get; set;}
}

public IDoOperationRequest
{
    Guid CorrelationId { get; set; }
}

public IOperationComplete
{
    Guid CorrelationId { get; set; }
    bool OperationSuccessful {get; set;}
}

My saga instance

public class DoOperationSaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public Name { get; set; }
    public string CurrentState { get; set; }
}

concrete implementation of IDoOperationRequest used to publish in state machine

public class DoOperationRequestImpl : IDoOperationRequest
{
    public Guid CorrelationId { get; set; }
}

concrete implementation of IStartSagaResponse used to publish in state machine

public class StartSagaResponse : IStartSagaResponse
{
    public Guid CorrelationId { get; set; }
    public bool DidComplete {get; set;}
}

My state machine

public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
    public State OperationPending { get; private set; }
    public State Complete { get; private set; }


    public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
    public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }


    public ProcessOperationStateMachine()
    {
        InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);

        Event(() => StartSagaRequestEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                    context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
        });

        Event(() => OperationCompleteEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                context => context.Message.CorrelationId);
        });


        Initially(
            When(StartSagaRequestEvent)
                .Then(context =>
                {
                    context.Instance.CorrelationId = context.Data.CorrelationId;
                    context.Instance.Name = context.Data.Name;
                    context.Publish(new DoOperationRequestImpl
                    {
                        CorrelationId = context.Data.CorrelationId
                    });

                })
                .TransitionTo(OperationPending)
        );

        During(OperationPending,
            When(OperationCompleteEvent)
                .Then(context =>
                {
                    // I'm just doing this for debugging
                    context.Instance.Name = "changed in operationComplete";
                })
                .ThenAsync(context => context.RespondAsync(new StartSagaResponse 
                { 
                    CorrelationId = context.Data.CorrelationId,
                    DidComplete = true
                }))
                .Finalize());

}

My consumer:

public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{

    public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
    {
       await context.Publish<IOperationComplete>(new
       {
          CorrelationId = context.Message.CorrelationId,
          OperationSuccessful = true
       });
    }
}

How I am hooking things up in DI in Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    stateMachine = new ProcessOperationStateMachine();

    SagaDbContextFactory factory = new SagaDbContextFactory();
    EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);

    services.AddMassTransit(x =>
    {

        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
        {
            IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            sbc.ReceiveEndpoint(host, "do-operation", ep =>
            {
                ep.UseMessageRetry(c => c.Interval(2, 100));
                ep.StateMachineSaga(stateMachine, repository);
                ep.Durable = false;
            });

            sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
            {
                ep.Consumer(() => new DoOperationRequestConsumer());
                ep.Durable = false;
            });
        }));
        x.AddConsumer<DoOperationRequestConsumer>();
    });

    services.AddScoped<DoOperationRequestConsumer>();

    services.AddScoped(p =>
        p.GetRequiredService<IBusControl>()
            .CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
                new Uri("rabbitmq://localhost/do-operation?durable=false"),
                TimeSpan.FromSeconds(30)));

}

and making the request in my controller:

public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
    Name = "from the controller",
    CorrelationId = guid
});

What I am seeing is my state machine does get started. When(StartSagaRequestEvent) does get hit and a DoOperationRequest message gets published. DoOperationRequestConsumer does get the message and publishes a IOperationComplete message. However that is where it stops. My IOperationCompleteEvent in my state machine does not get called. When I look in the database I can see my saga instance gets created with the guid and the CurrentState is set to OperationPending. When I look at my rabbitmq management dashboard I see a message get published after my DoOperationRequestConsumer does it's IOperationComplete message publish. I'm just not seeing the state machine consume the IOperationComplete message being published by the comsumer. When I set a breakpoint and inspect the message in the Consumer I do see that the CorrelationId is set to same value of the CorrelationId of the saga.

I also tried to explicitly use the "do-operation" queue in the consumer:

public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
    ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));

    await sendEndpoint.Send<IOperationComplete>(new
    {
      CorrelationId = context.Message.CorrelationId,
      OperationSuccessful = true
    });
}

but still wasn't able to make the connection.

I've been banging my head against this all day and am not sure what I'm missing here. If anyone could give me some suggestions on what I could be doing wrong I would really appreciate it, again sorry for the wall of text, I know it's allot to read but I wanted to be clear about what I was doing. Thanks much!


Solution

  • Your event correlationId seems suspect, it should be like this:

    Event(() => StartSagaRequestEvent, eventConfigurator =>
    {
        eventConfigurator.CorrelateById(context => context.Message.CorrelationId)
            .SelectId(context => context.Message.CorrelationId);
    });
    

    That way it is initializing to the CorrelationId of the message.

    Unrelated, but your endpoint should use the extension method for your container:

    sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
    {
        ep.ConfigureConsumer<DoOperationRequestConsumer>();
        ep.Durable = false;
    });
    

    And use the new request client, by configuring it in the extensions as well.

    x.AddRequestClient<IDoOperationRequest>(new Uri("rabbitmq://localhost/do-operation?durable=false"));
    

    Also, in your initial condition, this line should be removed:

    context.Instance.CorrelationId = context.Data.CorrelationId;