Search code examples
masstransitsaga

Successful request/response from saga leaves Canceled message in saga skipped queue


After much toil and trial and error I managed to issue a "request" from my saga and see it handle the response. My jubilation was cut short however by the appearance of a message in my states' skipped queue. (i'm using azure service bus)

It is of type "urn:message:MassTransit.Scheduling:CancelScheduledMessage".

I am a complete newbie at with mass transit and I'm just trying to get a contrived example going.

My saga calls TaxiToRunway/TaxiingComplete. My bit of saga code

Request(()=>TaxiToRunway, config =>
        {
            config.Timeout = TimeSpan.FromSeconds(30);
        });
...
public Request<PlaneState, TaxiToRunway, TaxiingComplete> TaxiToRunway { get; private set; }
...
Initially(
            When(ReadyToDepart)
                .Then(context =>
                {
                    context.Saga.Altitude = 0;
                    context.Saga.Speed = 0;
                    context.Saga.FlightNo = context.Message.FlightNo;
                    context.Saga.CorrelationId = context.Message.CorrelationId;
                    Console.WriteLine($"Flight {context.Message.FlightNo} is ready to depart.");
                })
                .TransitionTo(Taxiing)
                .Request(TaxiToRunway,
                    (context) => context.Init<TaxiToRunway>(new {CorrelationId = context.Saga.CorrelationId}))
...
During(Taxiing, 
            Ignore(ReadyToDepart),
            
                When(TaxiToRunway.Completed)
                    .Then(x =>
                    {
                        x.ToString();
                    })
                    .TransitionTo(TakingOff),

With a debugger attached I hit the x.ToString() line.

The consumer (in a different host):

public class TaxiToRunwayConsumer: IConsumer<TaxiToRunway>
{
    public async Task Consume(ConsumeContext<TaxiToRunway> context)
    {
        await context.RespondAsync<TaxiingComplete>(new
        {
            context.Message.CorrelationId
        });
    }
}

Saga startup config:

cfg.AddSagaStateMachine<PlaneStateMachine, PlaneState>()
                .MessageSessionRepository();
            
            cfg.AddServiceBusMessageScheduler();

            cfg.UsingAzureServiceBus((context, sbCfg) =>
            {
                var connectionString = appConfig.ServiceBus.ConnectionString;
                sbCfg.Host(connectionString);
                
                EndpointConvention.Map<TaxiToRunway>(new Uri("sb://xxx.servicebus.windows.net/taxi-to-runway"));
                
                sbCfg.UseServiceBusMessageScheduler();
                
                sbCfg.ReceiveEndpoint("plane-state", e =>
                {
                    e.UseInMemoryOutbox();
                    e.RequiresSession = true;
                    e.PrefetchCount = 50;
                    e.MaxConcurrentCalls = 50;
                    e.ConfigureSaga<PlaneState>(context);
                });
                
                sbCfg.ConfigureEndpoints(context);
            });

I can see this in the log output:

dbug: MassTransit.Messages[0]
      SEND sb://dbpdf-us-dev-sam.servicebus.windows.net/plane-state 80d90000-5d7b-2cf0-7a6b-08da0fd3e7b7 MassTransit.Scheduling.CancelScheduledMessage

Am I supposed to be handling this as an event??

Learning curve on this sure is steep! My question is what do I need to do to not have these messages go to skipped?


Solution

  • So, the reason this doesn't work:

    1. The message session saga repository can only correlate by the SessionId, since it's session-stored data.
    2. The requestId, therefore, MUST equal the saga instance correlationId (aka, the SessionId)
    3. The timeout message, sent by the request, gets a tokenId based upon the sequence number of the scheduled message
    4. Which isn't saved anywhere
    5. So the request timeout isn't canceled

    The proper approach, in this scenario, is to use a Request/Response that doesn't have a timeout and use a separate Schedule to schedule the timeout yourself.