Search code examples
rebusrebus-azureservicebus

Moving Error message from error queue back to original queue with Rebus


Is it possible to move error message from error queue to its original queue, programmatically or via UI?

Update

Questions below on the code below:

1 Does the code below apply to Publiser or Subscriber or both?

The code below:

 Configure.With(activator)
    .Transport(t => (...)) //< use queue "error" here
    .Routing(r =>
    {
        r.AddTransportMessageForwarder(async transportMessage =>
        {
            var sourceQueue = transportMessage.Headers.TryGetValue(Headers.SourceQueue, out var result)
                ? result
                : throw new ArgumentException($"Could not find '{Headers.SourceQueue}' header");

            return ForwardAction.ForwardTo(sourceQueue);
        });
    })
    .Start();

2 Transport method below works for my code. However, the code above suggests using error queue name, will it work?

Where are the Publiser and Subscriber queue name like below specified if the code above is used?

Please provide code for pub sub pattern.

Publisher:

.Transport(t => t.UseAzureServiceBus(Consts.ServiceBusConnectionString, Consts.Publisher))

Subscriber:

.Transport(t=>t.UseAzureServiceBus(Consts.ServiceBusConnectionString, Consts.Subscriber1))

https://github.com/rebus-org/Rebus/wiki/Transport-message-forwarding


Solution

  • Since Rebus uses ordinary queues as its dead-letter queues, it's quite easy to start a bus instance with error as the input queue – then you can e.g. use Rebus' built-in transport message forwarding capability to do that you want to the messages – e.g. forward them to their source queues:

    Configure.With(activator)
        .Transport(t => (...)) //< use queue "error" here
        .Routing(r =>
        {
            r.AddTransportMessageForwarder(async transportMessage =>
            {
                var sourceQueue = transportMessage.Headers.TryGetValue(Headers.SourceQueue, out var result)
                    ? result
                    : throw new ArgumentException($"Could not find '{Headers.SourceQueue}' header");
    
                return ForwardAction.ForwardTo(sourceQueue);
            });
        })
        .Start();
    
    

    or whatever you want in there.

    There also exists a UI, Fleet Manager, that can do this – it replaces the need for dead-letter queues entirely, as it stores failed messages in its database and makes it possible to return the failed messages to their source queues (or another queue, if that's what you want), but it's only available if you're a Rebus Pro subscriber.


    Update (with answers to the questions in your update):

    1) AddTransportMessageForwarder is only relevant for an endpoint that receives messages.

    2) It's the "queue name" specified as an argument to the .Useblablabla method. For example, with Azure Service Bus it would read

    .Transport(t => t.UseAzureServiceBus(Consts.ServiceBusConnectionString, "error"))