Search code examples
asp.net-coremasstransitsaga

Masstransit schedule quartz issue


I follow the ChrisPatterson tutorial, i try to do what he said in video, but when i schedule a message for Expiring the allocation it doesn't work, when i looked carefully I just noticed that the scheduler queue in RabbitMq did not have any consumer, I try to solve but I did not find the problem, it is my state machine:

public class AllocationStateMachine : MassTransitStateMachine<AllocationState>
{
    public AllocationStateMachine(ILogger<AllocationStateMachine> logger)
    {

        this.InstanceState(x => x.CurrentState);
        this.ConfigureCorrelationIds();

        Schedule(() => HoldExpiration, (allocationState) => allocationState.HoldDurationToken, (config) =>
        {
            //Set a deafult and fixed message delay
            config.Delay = TimeSpan.FromHours(1);

            config.Received = x => x.CorrelateById(m => m.Message.AllocationId);
        });

        Initially(When(AllocationCreated)
                         .Then(context => logger.LogInformation($"AllocationCreated for AllocationId: {context.Message.AllocationId}"))
                         .Schedule(HoldExpiration,
                                context =>
                                {
                                    var msg = context.Init<AllocationHoldDurationExpired>(new { context.Message.AllocationId });
                                    return msg;
                                },
                                context => context.Message.HoldDuration)
                        .TransitionTo(Allocated));


        During(Allocated,
          When(HoldExpiration.Received)
              .Then(context =>
              {
                  logger.LogInformation("Allocation expired {AllocationId}", context.Saga.CorrelationId);
              })
              .Finalize()

         );


        //     Sets the state machine instance to Completed when in the final state. The saga
        //     repository removes completed state machine instances.

        //SetCompletedWhenFinalized();

    }

    private void ConfigureCorrelationIds()
    {
        Event(() => AllocationCreated, x => x.CorrelateById(m => m.Message.AllocationId));
    }

    public Schedule<AllocationState, AllocationHoldDurationExpired> HoldExpiration { get; set; }

    public State Allocated { get; set; }
    public State Released { get; set; }

    public Event<IAllocationCreated> AllocationCreated { get; set; }
}

and the Program.cs:

builder.Services.AddMassTransit(x =>
{

x.AddConsumersFromNamespaceContaining<AllocateInventoryConsumer>();
x.AddSagaStateMachine<AllocationStateMachine, AllocationState>()
     .EntityFrameworkRepository(r =>
     {
         r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

         r.AddDbContext<DbContext, AllocationStateDbContext>((provider, dbContextOptionBuilder) =>
         {
             dbContextOptionBuilder.UseSqlServer(builder.Configuration.GetConnectionString("WarehouseConn"), m =>
             {
                 m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                 m.MigrationsHistoryTable($"__{nameof(AllocationStateDbContext)}");
             });
         });
     });

Uri schedulerEndpoint = new Uri("queue:scheduler");
x.AddMessageScheduler(schedulerEndpoint);

x.UsingRabbitMq((context, cfg) =>
{
    //for configuring Quartz endpoint
    cfg.UseMessageScheduler(schedulerEndpoint);
    //cfg.ReceiveEndpoint("scheduler", endpoint =>
    //{
    //    cfg.UseMessageScheduler(schedulerEndpoint);
    //});

    cfg.ConfigureEndpoints(context);
    cfg.Host(builder.Configuration["RabbitMQ:Host"], "/",
    h =>
    {
        h.Username(BusConstants.Username);
        h.Password(BusConstants.Password);
    });
  });
 });

and there are my packages:

<ItemGroup>
    <PackageReference Include="MassTransit" Version="8.0.1" />
    <PackageReference Include="MassTransit.Analyzers" Version="8.0.1" />
    <PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
    <PackageReference Include="MassTransit.RabbitMQ" Version="8.0.1" />
    <PackageReference Include="MassTransit.EntityFrameworkCore" Version="8.0.1" />
    <PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.4" />
    <PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="6.0.4">
        <PrivateAssets>all</PrivateAssets>
        <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
    <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.4" />
    <PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="6.0.4">
        <PrivateAssets>all</PrivateAssets>
        <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
    <PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
    <PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
</ItemGroup>

As I said the scheduler was created but it has not any consumer :

the schedule queue situation

what is the problem?


Solution

  • You aren't running Quartz on any of the endpoints. You'd need to have the actual Quartz consumers configured, along with Quartz, for it work. There is a sample available that shows how to configure it. Or you can configure it using the latest syntax which is only shown in the unit tests at this point.