Search code examples
rabbitmqschedulingmasstransit

MassTransit RMQ scheduling scheduled but not sent


I'm trying to implement scheduling mechanism by the masstransit/rabbitmq.

I've added the configuration as stated in the docs:

Uri schedulerEndpoint = new (Constants.MassTransit.SchedulerEndpoint);

services.AddMassTransit(mtConfiguration =>
{
    mtConfiguration.AddMessageScheduler(schedulerEndpoint);

    mtConfiguration.AddSagaStateMachine<ArcStateMachine, ArcProcess>(typeof(ArcSagaDefinition))
        .Endpoint(e => e.Name = massTransitConfiguration.SagaQueueName)
        .MongoDbRepository(mongoDbConfiguration.ConnectionString, r =>
        {
            r.DatabaseName = mongoDbConfiguration.DbName;
            r.CollectionName = mongoDbConfiguration.CollectionName;
        });

    mtConfiguration.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseMessageScheduler(schedulerEndpoint);

        cfg.Host(new Uri(rabbitMqConfiguration.Host), hst =>
        {
            hst.Username(rabbitMqConfiguration.Username);
            hst.Password(rabbitMqConfiguration.Password);
        });

            cfg.ConfigureEndpoints(context);
    });
});

Then I'm sending a scheduled message using the Bus:

DateTime messageScheduleTime = DateTime.UtcNow + TimeSpan.FromMinutes(1);
await _MessageScheduler.SchedulePublish<ScheduledMessage>(messageScheduleTime, new
{
    ActivationId = context.Data.ActivationId
});

_MessageCheduler is the IMessageScheduler instance.

I do see the Scheduler queue receive the scheduled message and I see the correct scheduledTime property in it but the message does not reach the state machine whenever its schedule should fire. Seems like I'm missing something in the configuration or some MassTransit service that is not started.

Please, assist.


Solution

  • If you actually read the documentation you would see that UseDelayedMessageScheduler is the proper configuration to use RabbitMQ for scheduling. And AddDelayedMessageScheduler for the container-based IMessageScheduler registration.