Search code examples
asp.net-coremasstransithangfire

Schedule MassTransit Publish in ASP.NET Core


I have an ASP.NET Core Web API publishing messages using MassTransit and I am adding Hangfire for scheduling. I would also like to be able to view the Hangfire dashboard. With the relevant configuration and publishing code, shown below, the messages aren't being scheduled. I can see the Hangfire dashboard and tables in the database. Does anyone know of a complete example doing this or has any suggestions?

Configuration:

builder.Services.AddHangfireServer()
                .AddHangfire(configuration => configuration.UseSqlServerStorage(...);

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((_, cfg) =>
    {
        cfg.Host(...);

        cfg.UsePublishMessageScheduler();
    });

    x.AddPublishMessageScheduler();
    x.AddHangfireConsumers();
});

app.UseHangfireDashboard()
   .UseHealthChecks("/health");

Publishing:

public class PublishingClass
{
    IMessageScheduler MessageScheduler { get; init; }

    public PublishingClass(IMessageScheduler messageScheduler)
    {
        MessageScheduler = messageScheduler;
    }

    public async Task Publish(Command command, DateTime sendDate)
    {
        await MessageScheduler.SchedulePublish(sendDate, command);
    }
}

Solution

  • You could try this working sample.
    Package
    Hangfire MassTransit MassTransit.Hangfire MassTransit.RabbitMQ Microsoft.Data.SqlClient

    program.cs

    builder.Services.AddHangfire(x => x.UseSqlServerStorage("Data Source=192.168.2.68;Initial Catalog=hangfireDB;User ID=sa;Password=xxxxx;TrustServerCertificate=True"));
    
    builder.Services.AddMassTransit(x =>
    {
        x.AddConsumers(typeof(Program).Assembly);
    
        x.AddPublishMessageScheduler();
    
        x.UsingRabbitMq((context, config) =>
        {
            config.UsePublishMessageScheduler();
            config.UseHangfireScheduler();
    
            config.Host("rabbitmq://xx.xxx.xxx.xxxx:5672", hostconfig =>
            {
                hostconfig.Username("guest");
                hostconfig.Password("guest");
            });
            config.ConfigureEndpoints(context);
        });
    });
    
    //builder.Services.AddOptions<MassTransitHostOptions>().Configure(options =>
    //    {
    //        options.WaitUntilStarted = true;
    //    });
    ...
    app.UseHangfireDashboard();
    

    MyModel.cs

        public record MyModel
        {
            public string value { get; init; }
        }
    

    Consumer1.cs

        public class Consumer1 : IConsumer<MyModel>  
        {
            public Task Consume(ConsumeContext<MyModel> context)
            {
                Console.WriteLine("Now is "+DateTime.Now.ToString());
                Console.WriteLine($"MassTransit.Consumer1 :{JsonSerializer.Serialize(context.Message)}");
                return Task.CompletedTask;
            }
        }
    

    Publish Controller

        [ApiController]
        public class ValuesController : ControllerBase
        {
    
            private readonly IMessageScheduler _scheduler;
    
            public ValuesController(IBus publishEndpoint,IMessageScheduler scheduler)
            {
                this._scheduler = scheduler;
            }
            [HttpGet("pub")]
            public async Task pub()
            {
                await _scheduler.SchedulePublish(DateTime.UtcNow.AddSeconds(10), new MyModel() { value = "message at "+DateTime.Now.ToString() });
            }
        }
    

    Test enter image description here