Search code examples
c#azure.net-coreazureservicebusservicebus

RenewLock error using ServiceBus SubscriptionClient


I am writing a console application in .net Core 2.1, my intent is to listen to messages on a Topic within ServiceBus and process new messages that arrive into Elasticsearch using the NEST api (NEST is probably irrelevant for my question but wanted to be transparent).

My Topic entity within ServiceBus is called "test" and I have a subscription also called "test" (full path would be "test/subscriptions/test").

In my .net Core console app, I've the following NuGet references:

<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />

I am having a very strange issue when using the .net Standard ServiceBus Api where I'm regularly getting a renew lock error:

Message handler encountered an exception Microsoft.Azure.ServiceBus.MessageLockLostException

I've stripped my code back into a very reproduceable sample here:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;

namespace SampleApp
{
    public class Program
    {

    private static SubscriptionClient _subscriptionClient;
    private static IElasticClient _elasticClient;

    private static string ServiceBusConnectionString = "[connectionString]";
    private static string TopicName = "test";
    private static string SubscriptionName = "test";

    public static void Main(string[] args)
    {
        var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
        _elasticClient = new ElasticClient(elasticsearchSettings);

        _subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);

        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
            MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };

        // Register the function that processes messages.
        _subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

        Console.WriteLine("INFO: Process message handler registered, listening for messages");
        Console.Read();
    }

    private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Message received.
        var content = Encoding.UTF8.GetString(message.Body);

        var messageBody = JsonConvert.DeserializeObject<string[]>(content);

        Console.WriteLine($"INFO: Message arrived: {message}");
        Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
        try
        {
            var response = _elasticClient.Ping();

            if (!response.IsValid && response.OriginalException != null)
                Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
            else
                Console.WriteLine("INFO: ElasticSearch was contacted successfully");
        }
        catch (Exception e)
        {
            Console.WriteLine("!ERROR!: " + e);
        }

        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        Console.WriteLine("INFO: Message completed");
    }

    // Use this handler to examine the exceptions received on the message pump.
    private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
        return Task.CompletedTask;
    }

}

This code is almost identical to the example taken from here: https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

I'm deliberately "pinging" an Elasticsearch instance that does not exist to produce the socket exception that helps me reproduce the issue.

One thing I've noticed is that when I create a new Topic and have EnabledPartioning = false, the problem does not occur.

Has anyone seen this before? Seems to be a problem deep within the ServiceBus code itself.

Note: I tried using the Receiver to read messages using "ReceiveAsync" and I also get this error in this scenario. Also, my driver for testing is to move off the .net Framework ServiceBus client (which does work with partitioning) and onto .net Core version.

Thanks in advance for any pointers!!


Solution

  • In my case above, the problem was down to a slight misunderstanding of my configuration. Within Azure, if you navigate to:

    Resource Group > ServiceBusInstance > Topics > testTopic > testSubscription

    You can find the subscription properties. Here you will see the duration of the lock when a message is sent. This defaults to 60 seconds but I extended my long running process to the max 5 minutes, as shown below:

    enter image description here

    Then in the code, when wiring up the properties for my Subscription Client, I needed to ensure the MaxAutoRenewDuration property was set correctly.

    I had presumed this property meant that if you defined 30 seconds for this, that internally, the subscription client would renew the lock every 30 seconds, thus if your max expiry was 5 minutes for example, the lock would be renewed as long as you were processing the message...

    In fact, what the property actually means is the maximum about of time they lock renewal will happen for internally on the subscription client.

    So if you set this to 24 hours e.g. Timespan.FromHours(24) and your processing was to take 12 hours, it would be renewed. However, if you set this to 12 hours using Timespan.FromHours(12) and your code ran for 24, when you went to complete the message it would give a lockLost exception (as I was getting above over shorter intervals!).

    One thing I've done, which was easy to implement, was dynamically pulling the LockDuration from the Subscription properties at runtime (all of my topics could have different configuration) and apply the MaxAutoRenewDuration appropriately using this.

    Code sample:

    sbNamespace.Topics.GetByName(“test”).Subscriptions.GetByName(“test”).LockDurationInSeconds
    

    Note - I'm using the Azure.Management.Fluent package to build the sbNamespace.

    Hopefully that helps others!