Search code examples
autofacmasstransit

masstransit - consumers are no registered and activated


I'm trying to register consumers but no success using mass transit. I registered MT using Autofac using module approach.

Firstly - I created some simple message:

public class SimpleMessage
{
    public string msg { get; set; } 
}

and I've managed to send them into queue:

var endpointTest = await _busControl.GetSendEndpoint(new Uri("queue:queueTest"));
await endpointTest.Send(new SimpleMessage
{
    msg = "test"
});

Then I created a consumer:

public class SimpleMessageConsumer : IConsumer<SimpleMessage>
{
    private readonly ILogger _logger;
    public SimpleMessageConsumer(ILogger logger)
    {
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<SimpleMessage> context)
    {
        _logger.Info($"got msg from queue: {context.Message}");
    }
}

But it won't run when the message appeared in the queue. My configuration is:

public class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterType<BusSettings>().As<IBusSettings>();

        builder.AddMassTransit(cfg =>
        {
            cfg.AddConsumer<SimpleMessageConsumer, SimpleMessageConsumerDefinition>();

            cfg.Builder.Register(context =>
            {
                var busSettings = context.Resolve<IBusSettings>();
                var logger = context.Resolve < ILogger >();
                var busControl = Bus.Factory.CreateUsingRabbitMq(bus =>
                {
                    bus.AutoDelete = busSettings.AutoDelete;
                    bus.Durable = busSettings.Durable;
                    bus.Exclusive = busSettings.Exclusive;
                    bus.ExchangeType = busSettings.Type;

                    //bus.UseNServiceBusJsonSerializer();

                    bus.Host(busSettings.HostAddress, busSettings.Port, busSettings.VirtualHost, null, h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });

                    bus.ReceiveEndpoint("queueTest", ec =>
                    {
                        ec.Consumer(() => new SimpleMessageConsumer(logger));
                    });
                });

                return busControl;

            }).SingleInstance().As<IBusControl>().As<IBus>();
        });
    }
}

in program.cs I have:

services.AddMassTransitHostedService();

and

containerBuilder.RegisterModule<BusModule>();

Such I mentioned - sending a msg to queue works but consumer wasn't running.

Can you help me what did I do wrong? how should I fix the configuration? in order to activate the consumer?


Solution

  • I've updated your configuration to work properly, using the actual bus configuration methods instead of mixing the two solutions:

    public class BusModule : Module
    {
        protected override void Load(ContainerBuilder builder)
        {
            builder.RegisterType<BusSettings>().As<IBusSettings>();
    
            builder.AddMassTransit(cfg =>
            {
                cfg.AddConsumer<SimpleMessageConsumer, SimpleMessageConsumerDefinition>();
    
                cfg.UsingRabbitMq((context,cfg) =>
                {
                    var busSettings = context.GetRequiredService<IBusSettings>();
                    var logger = context.GetRequiredService<ILogger>();
    
                    //bus.UseNServiceBusJsonSerializer();
    
                    bus.Host(busSettings.HostAddress, busSettings.Port, busSettings.VirtualHost, null, h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });
    
                    bus.ReceiveEndpoint("queueTest", ec =>
                    {
                        // i'm guessing these apply to the receive endpoint, not the bus endpoint
    
                        ec.AutoDelete = busSettings.AutoDelete;
                        ec.Durable = busSettings.Durable;
                        ec.Exclusive = busSettings.Exclusive;
                        ec.ExchangeType = busSettings.Type;
    
                        ec.ConfigureConsumer<SimpleMessageConsumer>(context);
                    });
                });
            });
        }
    }