Below is Stateless Service listener code running in a local SF cluster and listening to a Service Bus Queue.
All I want is to continuously listen to queue commands in an always-on SF service. Code improvement hints are welcome!
Problem #1
Abort() is continuously called, effectively closing my connection. What is causing this behavior and how do I fix it? In my understanding Abort() is supposed to be called only in case of unhandled exceptions or forced closure, neither of which I am aware of.
Bonus Problem
Say we comment out the CloseClient call from Abort(), which allows our queue to be processed correctly. After the 1st message the CancellationToken's WaitHandle is marked as disposed and passing it into my callback throws an exception. What is causing this?
Thanks for your help!
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
namespace Common
{
public class QueueListener : ICommunicationListener
{
private readonly string _connectionString;
private readonly string _path;
private readonly Action<BrokeredMessage> _callback;
private QueueClient _client;
public QueueListener(string connectionString, string path, Action<BrokeredMessage> callback)
{
// Set field values
_connectionString = connectionString;
_path = path;
// Save callback action
_callback = callback;
}
public Task<string> OpenAsync(CancellationToken cancellationToken)
{
// Connect to subscription
_client = QueueClient.CreateFromConnectionString(_connectionString, _path);
// Configure the callback options
var options = new OnMessageOptions
{
AutoComplete = false
};
// Catch and throw exceptions
options.ExceptionReceived += (sender, args) => throw args.Exception;
// Wire callback on message receipt
_client.OnMessageAsync(message =>
{
return Task.Run(() => _callback(message), cancellationToken)
.ContinueWith(task =>
{
if (task.Status == TaskStatus.RanToCompletion)
message.CompleteAsync();
else
message.AbandonAsync();
}, cancellationToken);
}, options);
return Task.FromResult(_client.Path);
}
public Task CloseAsync(CancellationToken cancellationToken)
{
CloseClient();
return Task.FromResult(_client.Path);
}
public void Abort()
{
CloseClient();
}
private void CloseClient()
{
// Make sure client is still open
if (_client == null || _client.IsClosed)
return;
// Close connection
_client.Close();
_client = null;
}
}
}
The problem was the whole concept behind creating an ICommunicationListener to interface with a Service Bus; the listener isn't listening to anything!
Refactoring the Service Bus connection into the default RunAsync() method inside the StatelessService implementation fixes the issue and allows monitoring the cancellation token as expected.