Search code examples
.net-coremasstransit

Why is there a 1 min delay between fetching new messages from MassTransit saga queue?


I am using MassTransit 8.3.4, .NET 7 and Azure Service Bus "Standard" pricing tier.

I have noticed that if:

  1. I send many messages (50) to the saga queue without saga running. All have unique correlation id.
  2. Run ASP.NET Web API with saga with default settings for concurrency (MaxConcurrentSessions=8) and ASB as repository (sessions enabled).

Then I get the following results:

8 messages are processed in parallel.

< 1 min hold - no saga instance process anything>

8 messages are processed in parallel.

< 1 min hold - no saga instance process anything>

...

Question: why is there a 1 min delay? what I am my missing? why after a saga instance completes - another message not pulled immediately from the queue?

Source code:

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();

var queueAName = "commandA";
builder.Services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<SimpleSaga, SimpleSagaState>().MessageSessionRepository();

    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.ReceiveEndpoint(queueAName, c =>
        {
            c.RequiresSession = true;
            c.ConfigureSaga<SimpleSagaState>(context);
        });

        cfg.Host("");
    });
});
var app = builder.Build();
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();


    public class SimpleSaga : MassTransitStateMachine<SimpleSagaState>
    {
        public SimpleSaga() 
        {
            InstanceState(x => x.CurrentState);

            Event(() => InitialCommandReceived, x => x.CorrelateById(context => context.CorrelationId.Value));

            Initially(
                When(InitialCommandReceived)
                    .Then(c => Console.WriteLine($"[{DateTime.UtcNow.TimeOfDay}]: InitialCommandReceived: received with CorrelationId: {c.CorrelationId}."))
                    .Finalize());

            SetCompletedWhenFinalized();
        }

        public Event<CommandA> InitialCommandReceived { get; set; }
    }

   public class SimpleSagaState : SagaStateMachineInstance
   {
       public Guid CorrelationId { get; set; }
       public string CurrentState { get; set; }
   }

Output:

[19:19:02.4539790]: InitialCommandReceived: received with CorrelationId: 538f04fd-62b1-4885-8be2-4f1e1a143539.

[19:19:02.4540137]: InitialCommandReceived: received with CorrelationId: 87077e87-447d-422e-b078-b73236c9d23e.

[19:19:02.4540267]: InitialCommandReceived: received with CorrelationId: 3d9297d8-5117-4bf3-8bb6-769b045ba68e.

[19:19:02.4539898]: InitialCommandReceived: received with CorrelationId: 8a02518f-c6f9-4fcb-a605-dede410e8610.

[19:19:02.4545988]: InitialCommandReceived: received with CorrelationId: 7773cd68-a58d-420f-bb6a-23fe67c5fcf1.

[19:19:02.5554364]: InitialCommandReceived: received with CorrelationId: aa12eb8a-fa9f-4b52-ae69-917ba04bdec3.

[19:19:02.6620557]: InitialCommandReceived: received with CorrelationId: b8ca11e2-cdcb-4388-8263-6e8df22f7cfb.

[19:19:02.7596565]: InitialCommandReceived: received with CorrelationId: 722e6aa7-4fb6-43c1-87a2-b09294e7f3c1.

--------------- 1 min -------------------------

[19:20:03.5855011]: InitialCommandReceived: received with CorrelationId: f1cdd00a-dd83-49bc-bf0d-33ab3052673c.

[19:20:03.5854979]: InitialCommandReceived: received with CorrelationId: da7c2aa7-c676-447d-8f5f-42d1e0546b7b.

[19:20:03.5856126]: InitialCommandReceived: received with CorrelationId: 264c9e84-b302-4511-a00a-6e6241e091e8.

[19:20:03.5860874]: InitialCommandReceived: received with CorrelationId: fc8f1f03-4103-4501-b406-88cc1c59f528.

[19:20:03.5893627]: InitialCommandReceived: received with CorrelationId: 779f42ae-8986-4fdb-b3c3-28113652717c.

[19:20:03.6894042]: InitialCommandReceived: received with CorrelationId: 7672425b-e0fc-4584-8a96-ff2d17689df8.

[19:20:03.8924069]: InitialCommandReceived: received with CorrelationId: 1182d62a-5657-4fb4-9877-efbc31155f01.

[19:20:03.9992448]: InitialCommandReceived: received with CorrelationId: 8fc8ecfe-5c14-4279-af71-95e48a1afa64.

--------------- 1 min -------------------------

[19:21:04.8731504]: InitialCommandReceived: received with CorrelationId: 3cf94c6a-2e5b-4740-9407-e12ae1ed9555.

[19:21:04.8732114]: InitialCommandReceived: received with CorrelationId: 478921d9-9a1a-49f1-a99d-2e5613625b74.

[19:21:05.2777634]: InitialCommandReceived: received with CorrelationId: 497d815f-466f-4baf-89cb-618e0bf2e9d3.

[19:21:05.2781711]: InitialCommandReceived: received with CorrelationId: d142326f-6b2a-4792-8cf8-0f0914aeea36.

[19:21:05.2782202]: InitialCommandReceived: received with CorrelationId: 718bce5a-c8e1-4163-a0df-ba795d8fc3d3.

[19:21:05.4816332]: InitialCommandReceived: received with CorrelationId: c9bfc039-7fdc-4cdf-b496-0e01d781b392.

[19:21:05.4816541]: InitialCommandReceived: received with CorrelationId: 4aa1a967-c9a3-4233-9d63-796b07af3aab.

[19:21:05.5837755]: InitialCommandReceived: received with CorrelationId: 7dfc6640-856a-4ef8-b5cc-c3469a6bb5bc.


Solution

  • You can configure the session settings to suit your requirements.

    If you want less time spend idle before releasing a session, consider lowering the SessionIdleTimeout:

    cfg.ReceiveEndpoint(queueAName, c =>
    {
        c.RequiresSession = true;
        c.SessionIdleTimeout = TimeSpan.FromSeconds(3);
        c.ConfigureSaga<SimpleSagaState>(context);
    });