Search code examples
azureazureservicebusnimbus

Message being retried when operation takes time


I have a messaging system using Azure ServiceBus but I'm using Nimbus on top of that. I have an endpoint that sends a command to another endpoint and at one point the handler class on the other side picks it up, so it is all working fine.

When the operation takes time, roughly more than 20 second or so, the handler gets 'another' call with the same message. It looks like Nimbus is retrying the message that is already being handled by an other (even the same) instance of the handler, I don't see any exceptions being thrown and I could easily repro this with the following handler:

public class Synchronizer : IHandleCommand<RequestSynchronization>
{
    public async Task Handle(RequestSynchronization synchronizeInfo)
    {
        Console.WriteLine("Received Synchronization");

        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process

        Console.WriteLine("Got through first timeout");

        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process

        Console.WriteLine("Got through second timeout");
    }
}

My question is: How do I disable this behavior? I am happy for the transaction take time as it is a heavy process that I have off-loaded from my website, which was the whole point of going with this architecture in the first place.

In other words, I was expecting the message to not to be picked up by another handler while one has picked it up and is processing it, unless there's an exception and the message goes back to the queue and eventually gets picked up for a retry.

Any ideas how to do this? Anything I'm missing?


Solution

  • By default, ASB/WSB will give you a message lock of 30 seconds. The idea is that you pop a BrokeredMessage off the head of the queue but have to either .Complete() or .Abandon() that message within the lock timeout.

    If you don't do that, the service bus assumes that you've crashed or otherwise failed and it will return that message to the queue to be re-processed.

    You have a couple of options:

    1) Implement ILongRunningHandler on your handler. Nimbus will pay attention to the remaining lock time and automatically renew your message lock. Caution: The maximum message lock time supported by ASB/WSB is five minutes no matter how many times you renew so if your handler takes longer than that then you might want option #2.

    public class Synchronizer : IHandleCommand<RequestSynchronization>, ILongRunningTask
    {
        public async Task Handle(RequestSynchronization synchronizeInfo)
        {
            Console.WriteLine("Received Synchronization");
    
            await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
    
            Console.WriteLine("Got through first timeout");
    
            await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
    
            Console.WriteLine("Got through second timeout");
        }
    }
    

    2) In your handler, call a Task.Run(() => SomeService(yourMessage)) and return. If you do this, be careful about lifetime scoping of dependencies if your handler takes any. If you need an IFoo, take a dependency on a Func> (or equivalent depending on your container) and resolve that within your handling task.

    public class Synchronizer : IHandleCommand<RequestSynchronization>
    {
        private readonly Func<Owned<IFoo>> fooFunc;
    
        public Synchronizer(Func<Owned<IFoo>> fooFunc)
        {
            _fooFunc = fooFunc;
        }
    
        public async Task Handle(RequestSynchronization synchronizeInfo)
        {
            // don't await!
            Task.Run(() => {
                using (var foo = _fooFunc())
                {
                  Console.WriteLine("Received Synchronization");
    
                  await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
    
                  Console.WriteLine("Got through first timeout");
    
                  await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
    
                  Console.WriteLine("Got through second timeout");
                }
    
            });
        }
    }