I follow the ChrisPatterson tutorial, i try to do what he said in video, but when i schedule a message for Expiring the allocation
it doesn't work, when i looked carefully I just noticed that the scheduler queue in RabbitMq
did not have any consumer, I try to solve but I did not find the problem, it is my state machine:
public class AllocationStateMachine : MassTransitStateMachine<AllocationState>
{
public AllocationStateMachine(ILogger<AllocationStateMachine> logger)
{
this.InstanceState(x => x.CurrentState);
this.ConfigureCorrelationIds();
Schedule(() => HoldExpiration, (allocationState) => allocationState.HoldDurationToken, (config) =>
{
//Set a deafult and fixed message delay
config.Delay = TimeSpan.FromHours(1);
config.Received = x => x.CorrelateById(m => m.Message.AllocationId);
});
Initially(When(AllocationCreated)
.Then(context => logger.LogInformation($"AllocationCreated for AllocationId: {context.Message.AllocationId}"))
.Schedule(HoldExpiration,
context =>
{
var msg = context.Init<AllocationHoldDurationExpired>(new { context.Message.AllocationId });
return msg;
},
context => context.Message.HoldDuration)
.TransitionTo(Allocated));
During(Allocated,
When(HoldExpiration.Received)
.Then(context =>
{
logger.LogInformation("Allocation expired {AllocationId}", context.Saga.CorrelationId);
})
.Finalize()
);
// Sets the state machine instance to Completed when in the final state. The saga
// repository removes completed state machine instances.
//SetCompletedWhenFinalized();
}
private void ConfigureCorrelationIds()
{
Event(() => AllocationCreated, x => x.CorrelateById(m => m.Message.AllocationId));
}
public Schedule<AllocationState, AllocationHoldDurationExpired> HoldExpiration { get; set; }
public State Allocated { get; set; }
public State Released { get; set; }
public Event<IAllocationCreated> AllocationCreated { get; set; }
}
and the Program.cs
:
builder.Services.AddMassTransit(x =>
{
x.AddConsumersFromNamespaceContaining<AllocateInventoryConsumer>();
x.AddSagaStateMachine<AllocationStateMachine, AllocationState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion
r.AddDbContext<DbContext, AllocationStateDbContext>((provider, dbContextOptionBuilder) =>
{
dbContextOptionBuilder.UseSqlServer(builder.Configuration.GetConnectionString("WarehouseConn"), m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(AllocationStateDbContext)}");
});
});
});
Uri schedulerEndpoint = new Uri("queue:scheduler");
x.AddMessageScheduler(schedulerEndpoint);
x.UsingRabbitMq((context, cfg) =>
{
//for configuring Quartz endpoint
cfg.UseMessageScheduler(schedulerEndpoint);
//cfg.ReceiveEndpoint("scheduler", endpoint =>
//{
// cfg.UseMessageScheduler(schedulerEndpoint);
//});
cfg.ConfigureEndpoints(context);
cfg.Host(builder.Configuration["RabbitMQ:Host"], "/",
h =>
{
h.Username(BusConstants.Username);
h.Password(BusConstants.Password);
});
});
});
and there are my packages:
<ItemGroup>
<PackageReference Include="MassTransit" Version="8.0.1" />
<PackageReference Include="MassTransit.Analyzers" Version="8.0.1" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.0.1" />
<PackageReference Include="MassTransit.EntityFrameworkCore" Version="8.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
</ItemGroup>
As I said the scheduler
was created but it has not any consumer :
what is the problem?
You aren't running Quartz on any of the endpoints. You'd need to have the actual Quartz consumers configured, along with Quartz, for it work. There is a sample available that shows how to configure it. Or you can configure it using the latest syntax which is only shown in the unit tests at this point.