Search code examples
masstransit

Register ReceiveEndpoint with RoutingKey at runtime


I created solution for mass document processing where single document processing time is about 10-30s long. When It will be processed using one queue, when one client post 100 docs to queue, second client with one doc will be wait for result too long. So I modified topology to direct exchanges with client ID as routingKey, and it works fine. But, when I need to enable another client ID I must restart service. So I looking for solution to do:

 cfg.Send<MessageA>(x =>
 {
     // use customerType for the routing key
     x.UseRoutingKeyFormatter(context => context.Message.CustomerID.ToString());

     // multiple conventions can be set, in this case also CorrelationId
     x.UseCorrelationId(context => context.TransactionId);
 });

 cfg.Publish<MessageA>(x => x.ExchangeType = ExchangeType.Direct);
for (int i = 0; i < 5; i++)
{
    cfg.ReceiveEndpoint($"MessageA-{i}-ConsumerA", x =>
    {
        x.ConfigureConsumeTopology = false;
        x.ConfigureConsumer<ConsumerA>(context);
        x.Bind<MessageA>(s =>
        {
            s.RoutingKey = i.ToString();
            s.ExchangeType = ExchangeType.Direct;
        });
    });
}

at runtime.


Solution

  • As advice in comment I write down solution.

    var queueName = $"MessageA-{(string)item}-MessageAConsumer";
    
    var handle = _endpointConnector.ConnectReceiveEndpoint(queueName, (context, config) =>
    {
        config.ConfigureConsumeTopology = false;
        config.ConfigureConsumer<MessageAConsumer>(context);
        ((IRabbitMqReceiveEndpointConfigurator)config).Bind<MessageA>(s =>
        {
            s.RoutingKey = (string)item;
            s.ExchangeType = ExchangeType.Direct;
        });
        config.ConfigureTransport(cfg =>
        {
            cfg.ConcurrentMessageLimit = 20;
            cfg.PrefetchCount = 20;
        });
    });