Search code examples
c#azureservicebusazure-servicebus-queuescancellationtokensource

Cancellation token processing in multiple customers of Service Bus Queue


I have configurable count of Server Bus queue consumers in a single process. The code uses ReceiveAsync method of QueueClient class and it invokes QueueClient.Close on cancellation.

It works pretty well but it turned out that there is some issue with closing QueueClient - only one client ends immediately, all others hang until serverWaitTime timeout expires.

Look at the code and its output:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;

public class Program
{
    private static void Main()
    {
        CancellationTokenSource source = new CancellationTokenSource();
        var cancellationToken = source.Token;
        var logger = new Logger();

        Task.Run(() =>
        {
            Task.Delay(TimeSpan.FromSeconds(10)).Wait();
            source.Cancel();
            logger.Log("Cancellation requested.");
        });

        string connectionString = "...";
        string queueName = "...";

        var workers = Enumerable.Range(1, 3).Select(i => new Worker(connectionString, queueName, logger));
        var tasks = workers.Select(worker => Task.Run(() => worker.RunAsync(cancellationToken), cancellationToken)).ToArray();
        Task.WaitAll(tasks);
        logger.Log("The end.");
    }
}

class Worker
{
    private readonly Logger _logger;
    private readonly QueueClient _queueClient;

    public Worker(string connectionString, string queueName, Logger logger)
    {
        _logger = logger;
        _queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
    }

    public async Task RunAsync(CancellationToken cancellationToken)
    {
        _logger.Log($"Worker {GetHashCode()} started.");
        using (cancellationToken.Register(() => _queueClient.Close()))
            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    var message = await _queueClient.ReceiveAsync(TimeSpan.FromSeconds(20));
                    _logger.Log($"Worker {GetHashCode()}: Process message {message.MessageId}...");
                }
                catch (OperationCanceledException ex)
                {
                    _logger.Log($"Worker {GetHashCode()}: {ex.Message}");
                }
            }
        _logger.Log($"Worker {GetHashCode()} finished.");
    }
}

class Logger
{
    private readonly Stopwatch _stopwatch;

    public Logger()
    {
        _stopwatch = new Stopwatch();
        _stopwatch.Start();
    }

    public void Log(string message) => Console.WriteLine($"{_stopwatch.Elapsed}: {message}");
}

Output:

00:00:00.8125644: Worker 12547953 started.
00:00:00.8127684: Worker 45653674 started.
00:00:00.8127314: Worker 59817589 started.
00:00:10.4534961: Cancellation requested.
00:00:11.4912900: Worker 45653674: The operation cannot be performed because the entity has been closed or aborted.
00:00:11.4914054: Worker 45653674 finished.
00:00:22.3242631: Worker 12547953: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3244501: Worker 12547953 finished.
00:00:22.3243945: Worker 59817589: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3252456: Worker 59817589 finished.
00:00:22.3253535: The end.

So as you can see the worker 45653674 stopped immediately but two others stopped only 10 seconds later.


Solution

  • I found some helpful information in this article: https://developers.de/blogs/damir_dobric/archive/2013/12/03/service-bus-undocumented-scaling-tips-amp-tricks.aspx. The issue goes away if each queue client works via its own physical connection.

    So to fix the issue it's necessary to replace the following code:

    _queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
    

    with

    var factory = MessagingFactory.CreateFromConnectionString(connectionString);
    _queueClient = factory.CreateQueueClient(queueName);