Search code examples
c#azureazure-service-fabricazureservicebusazure-servicebus-queues

Why is ICommunicationListener.Abort() continuously invoked?


Below is Stateless Service listener code running in a local SF cluster and listening to a Service Bus Queue.

All I want is to continuously listen to queue commands in an always-on SF service. Code improvement hints are welcome!

Problem #1

Abort() is continuously called, effectively closing my connection. What is causing this behavior and how do I fix it? In my understanding Abort() is supposed to be called only in case of unhandled exceptions or forced closure, neither of which I am aware of.

Bonus Problem

Say we comment out the CloseClient call from Abort(), which allows our queue to be processed correctly. After the 1st message the CancellationToken's WaitHandle is marked as disposed and passing it into my callback throws an exception. What is causing this?

Thanks for your help!

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using Microsoft.ServiceFabric.Services.Communication.Runtime;

namespace Common
{
    public class QueueListener : ICommunicationListener
    {
        private readonly string _connectionString;
        private readonly string _path;
        private readonly Action<BrokeredMessage> _callback;

        private QueueClient _client;

        public QueueListener(string connectionString, string path, Action<BrokeredMessage> callback)
        {
            // Set field values
            _connectionString = connectionString;
            _path = path;

            // Save callback action
            _callback = callback;
        }

        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            // Connect to subscription
            _client = QueueClient.CreateFromConnectionString(_connectionString, _path);

            // Configure the callback options
            var options = new OnMessageOptions
            {
                AutoComplete = false
            };

            // Catch and throw exceptions
            options.ExceptionReceived += (sender, args) => throw args.Exception;

            // Wire callback on message receipt
            _client.OnMessageAsync(message =>
            {
                return Task.Run(() => _callback(message), cancellationToken)
                    .ContinueWith(task =>
                    {
                        if (task.Status == TaskStatus.RanToCompletion)
                            message.CompleteAsync();
                        else
                            message.AbandonAsync();
                    }, cancellationToken);
            }, options);

            return Task.FromResult(_client.Path);
        }

        public Task CloseAsync(CancellationToken cancellationToken)
        {
            CloseClient();
            return Task.FromResult(_client.Path);
        }

        public void Abort()
        {
            CloseClient();
        }

        private void CloseClient()
        {
            // Make sure client is still open
            if (_client == null || _client.IsClosed)
                return;

            // Close connection
            _client.Close();
            _client = null;
        }
    }
}

Solution

  • The problem was the whole concept behind creating an ICommunicationListener to interface with a Service Bus; the listener isn't listening to anything!

    Refactoring the Service Bus connection into the default RunAsync() method inside the StatelessService implementation fixes the issue and allows monitoring the cancellation token as expected.