Search code examples
c#.netrabbitmqmasstransitmessage-bus

How can I stop MassTransit creating exchange bindings for error messages


I'm trying to listen to the error queue to process failed messages but I can't seem to get MassTransit not to set bindings on message that i want it to listen to within the configuration. The configuration is below and is using v3 of MassTransit:

var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";

_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
  var host = configurator.Host(hostAddress, h =>
  {
    h.Username(username);
    h.Password(password);
  });

  configurator.ReceiveEndpoint(host, "myqueue_error",
  endpointConfigurator =>
  {
    endpointConfigurator.Handler<SomeMessage>(context =>
    {
      return Console.Out.WriteLineAsync("Woop");
    });
  });
});

In the above example it will set bindings up for anything that publishes SomeMessage and direct them in to the myqueue_error which I only want messages going in to this queue which has been forward from the service that are failing. Is there anyway to consume messages from a queue but tell MassTransit not to get bindings up for them?

Update - Potential Solution

It seems that I don't need to setup a ReceiveEndpoint but I can just rename the controlbus to accept the message that I care about, This will then be able to process these messages without creating exchange bindings to the messages.

Below is the altered code, not sure if this is an ideal way but it works

var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";

_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
  configurator.Host(hostAddress, h =>
  {
    h.Username(username);
    h.Password(password);
  });

  // We need to make the queue look like the error queue
  configurator.BusQueueName = $"{_queue}_error";
  configurator.Durable = true;
  configurator.AutoDelete = false;
  configurator.SetQueueArgument("x-expires", null);
});

var connectHandle = _busControl.ConnectHandler<SomeMessage>(context => Console.Out.WriteLineAsync("Woop"));

_busHandle = _busControl.Start();

_busHandle.Ready.Wait();

// Wait

// Clean up

connectHandle.Disconnect();
_busHandle.Stop

Solution

  • From a lot of digging around I found a better solution which I totally missed from the documentation.

    It seems that we can listen to messages by subscribing consumers to listen to Fault This works perfect for what I've been trying to achieve and we can also keep the error queues in tack. http://docs.masstransit-project.com/en/mt3/usage/exceptions.html#handling-exceptions

    So the final bit of configuration that i settle with is the following:

        var hostAddress = new Uri("rabbitmq://localhost/");
        var username = "guest";
        var password = "guest";
    
        _busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
        {
          var host = configurator.Host(hostAddress, h =>
          {
            h.Username(username);
            h.Password(password);
          });
    
          configurator.ReceiveEndpoint(host, "error_listener",
          endpointConfigurator =>
          {
            endpointConfigurator.Handler<Fault<SomeMessage>>(context =>
            {
              return Console.Out.WriteLineAsync("Woop");
            });
          });
        });