i tried to consume new messages after creates new queue in application runtime. Follow this step-by-step example
Service publish new ItemCreatedMessage
when new item is created in my app
The ItemCreatedMessage
is consumed by CreateItemQueuesConsumer
consumer
2.1. In CreateItemQueuesConsumer
, i configure ConnectReceiveEndpoint
to set my queue name and use the second param IReceiveEndpointConfigurator
to connect a consumer with a consumer factory method
My queues is successfuly created, binding to my exchange, and i can see they in RabbitMQ console
But when i publish new message to my queue, that was to be consumed by my configured consumer in factory method, the consumers throws a System.ObjectDisposedException
This is my following code:
// CreateMachineQueuesConsumer.cs
public async Task Consume(ConsumeContext<MachineCreatedMessage> context)
{
var message = context.Message;
var machineId = message.MachineId;
var cancellationToken = context.CancellationToken;
using var provider = this.serviceProvider.CreateScope();
var createMachineStatusQueue = this.bus.ConnectReceiveEndpoint(
string.Format("update_machine_status__{0}", machineId),
x =>
{
if (x is IRabbitMqReceiveEndpointConfigurator c)
{
c.ConfigureConsumeTopology = false;
c.ConcurrentMessageLimit = 1;
c.PrefetchCount = 1;
c.Consumer(() =>
{
return provider.ServiceProvider
.GetRequiredService<UpdateMachineStatusConsumer>();
});
c.Bind("machine_status_cycle", s =>
{
s.RoutingKey = machineId.ToString();
s.ExchangeType = ExchangeType.Direct;
});
}
});
var anotherQueues = ...
When I reload my application, the messages are consumed because I have a background service that creates new queues when my application is building.
To connect receive endpoints, you should be using IReceiveEndpointConnector
in your consumer.
public class ThisConsumer :
IConsumer<MachineCreatedMessage>
{
public ThisConsumer(IReceiveEndpointConnector connector)
{
_connector = connector;
}
public async Task Consume(ConsumeContext<MachineCreatedMessage> context)
{
var message = context.Message;
var machineId = message.MachineId;
var cancellationToken = context.CancellationToken;
var queueHandle = _connector.ConnectReceiveEndpoint(string.Format("update_machine_status__{0}", machineId),
(context, cfg) =>
{
cfg.ConfigureConsumeTopology = false;
cfg.ConcurrentMessageLimit = 1;
cfg.PrefetchCount = 1;
cfg.ConfigureConsumer<UpdateMachineStatusConsumer(context);
if (cfg is IRabbitMqReceiveEndpointConfigurator c)
{
c.Bind("machine_status_cycle", s =>
{
s.RoutingKey = machineId.ToString();
s.ExchangeType = ExchangeType.Direct;
});
}
});
// optional, don't have to wait
await queueHandle.Ready;
}
}
Also, within AddMassTransit
make sure that you call both:
x.AddConsumer<ThisConsumer>();
and
x.AddConsumer<UpdateMachineStatusConsumer>();
This is covered in the documentation.