Search code examples
c#rabbitmqmasstransit

C# MassTransit - Consuming messages after creates new queue in runtime app throws System.ObjectDisposedException exception


i tried to consume new messages after creates new queue in application runtime. Follow this step-by-step example

  1. Service publish new ItemCreatedMessage when new item is created in my app

  2. The ItemCreatedMessage is consumed by CreateItemQueuesConsumer consumer

    2.1. In CreateItemQueuesConsumer, i configure ConnectReceiveEndpoint to set my queue name and use the second param IReceiveEndpointConfigurator to connect a consumer with a consumer factory method

  3. My queues is successfuly created, binding to my exchange, and i can see they in RabbitMQ console

  4. But when i publish new message to my queue, that was to be consumed by my configured consumer in factory method, the consumers throws a System.ObjectDisposedException

This is my following code:

// CreateMachineQueuesConsumer.cs

public async Task Consume(ConsumeContext<MachineCreatedMessage> context)
    {
        var message = context.Message;
        var machineId = message.MachineId;
        var cancellationToken = context.CancellationToken;

        using var provider = this.serviceProvider.CreateScope();

        var createMachineStatusQueue = this.bus.ConnectReceiveEndpoint(
            string.Format("update_machine_status__{0}", machineId),
            x =>
            {
                if (x is IRabbitMqReceiveEndpointConfigurator c)
                {
                    c.ConfigureConsumeTopology = false;
                    c.ConcurrentMessageLimit = 1;
                    c.PrefetchCount = 1;

                    c.Consumer(() =>
                    {
                        return provider.ServiceProvider
                            .GetRequiredService<UpdateMachineStatusConsumer>();
                    });

                    c.Bind("machine_status_cycle", s =>
                    {
                        s.RoutingKey = machineId.ToString();
                        s.ExchangeType = ExchangeType.Direct;
                    });
                }
            });

            var anotherQueues = ...

When I reload my application, the messages are consumed because I have a background service that creates new queues when my application is building.


Solution

  • To connect receive endpoints, you should be using IReceiveEndpointConnector in your consumer.

    public class ThisConsumer :
        IConsumer<MachineCreatedMessage>
    {
        public ThisConsumer(IReceiveEndpointConnector connector)
        {
            _connector = connector;
        }
    
        public async Task Consume(ConsumeContext<MachineCreatedMessage> context)
        {
            var message = context.Message;
            var machineId = message.MachineId;
            var cancellationToken = context.CancellationToken;
    
            var queueHandle = _connector.ConnectReceiveEndpoint(string.Format("update_machine_status__{0}", machineId), 
                (context, cfg) =>
                {
                    cfg.ConfigureConsumeTopology = false;
                    cfg.ConcurrentMessageLimit = 1;
                    cfg.PrefetchCount = 1;
    
                    cfg.ConfigureConsumer<UpdateMachineStatusConsumer(context);
    
                    if (cfg is IRabbitMqReceiveEndpointConfigurator c)
                    {
                        c.Bind("machine_status_cycle", s =>
                        {
                            s.RoutingKey = machineId.ToString();
                            s.ExchangeType = ExchangeType.Direct;
                        });
                    }
                });
    
            // optional, don't have to wait
            await queueHandle.Ready;
        }
    }
    

    Also, within AddMassTransit make sure that you call both:

    x.AddConsumer<ThisConsumer>(); and x.AddConsumer<UpdateMachineStatusConsumer>();

    This is covered in the documentation.