Search code examples
rebusrebus-rabbitmq

How to immediately stop processing new messages when inside a message handler?


I have a Rebus bus setup with a single worker and max parallelism of 1 that processes messages "sequentialy". In case an handler fails, or for specific business reason, I'd like the bus instance to immediately stop processing messages.

I tried using the Rebus.Event package to detect the exception in the AfterMessageHandled handler and set the number of workers to 0, but it seems other messages are processed before it can actually succeed in stoping the single worker instance.

Where in the event processing pipeline could I do bus.Advanced.Workers.SetNumberOfWorkers(0); in order to prevent further message processing?

I also tried setting the number of workers to 0 inside a catch block in the handler itself, but it doesn't seem like the right place to do it since SetNumberOfWorkers(0) waits for handlers to complete before returning and the caller is the handler... Looks like a some kind of a deadlock to me.

Thank you


Solution

  • This particular situation is a little bit of a dilemma, because – as you've correctly observed – SetNumberOfWorkers is a blocking function, which will wait until the desired number of threads has been reached.

    In your case, since you're setting it to zero, it means your message handler needs to finish before the number of threads has reached zero... and then: 💣 ☠🔒

    I'm sorry to say this, because I bet your desire to do this is because you're in a pickle somehow – but generally, I must say that wanting to process messages sequentually and in order with message queues is begging for trouble, because there are so many things that can lead to messages being reordered.

    But, I think you can solve your problem by installing a transport decorator, which will bypass the real transport when toggled. If the decorator then returns null from the Receive method, it will trigger Rebus' built-in back-off strategy and start chilling (i.e. it will increase the waiting time between polling the transport).

    Check this out – first, let's create a simple, thread-safe toggle:

    public class MessageHandlingToggle
    {
        public volatile bool ProcessMessages = true;
    }
    

    (which you'll probably want to wrap up and make pretty somehow, but this should do for now)

    and then we'll register it as a singleton in the container (assuming Microsoft DI here):

    services.AddSingleton(new MessageHandlingToggle());
    

    We'll use the ProcessMessages flag to signal whether message processing should be enabled.

    Now, when you configure Rebus, you decorate the transport and give the decorator access to the toggle instance in the container:

    services.AddRebus((configure, provider) =>
        configure
            .Transport(t => {
                t.Use(...);
    
                // install transport decorator here
                t.Decorate(c => {
                    var transport = c.Get<ITransport>();
                    var toggle = provider.GetRequiredService<MessageHandlingToggle>();
                    return new MessageHandlingToggleTransportDecorator(transport, toggle);
                })
            })
            .(...)
    );
    

    So, now you'll just need to build the decorator:

    public class MessageHandlingToggleTransportDecorator : ITransport
    {
        static readonly Task<TransportMessage> NoMessage = Task.FromResult<TransportMessage>(null);
    
        readonly ITransport _transport;
        readonly MessageHandlingToggle _toggle;
    
        public MessageHandlingToggleTransportDecorator(ITransport transport, MessageHandlingToggle toggle)
        {
            _transport = transport;
            _toggle = toggle;
        }   
    
        public string Address => _transport.Address;
    
        public void CreateQueue(string address) => _transport.CreateQueue(address);
    
        public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context) 
            => _transport.Send(destinationAddress, message, context);
    
        public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
            => _toggle.ProcessMessages 
                ? _transport.Receive(context, cancellationToken)
                : NoMessage;
    }
    

    As you can see, it'll just return null when ProcessMessages == false. Only thing left is to decide when to resume processing messages again, pull MessageHandlingToggle from the container somehow (probably by having it injected), and then flick the bool back to true.

    I hope can work for you, or at least give you some inspiration on how you can solve your problem. 🙂