Search code examples
c#azureazureservicebusazure-worker-rolesazure-servicebus-topics

Azure Service Bus Topics and Subscriptions with Worker Role


So I've recently gotten the need to use Service Bus Topic and Subscriptions and I've followed many articles and tutorials. I've been able to successfully implement Microsoft's Get started with Service Bus topics and also successfully used Visual Studio 2017's Worker Role template to access a database.

However, I'm confused as to how to properly "combine" the two. While the Get started with Service Bus topics article shows how to create 2 apps, one to send and one to receive and then quits, the Worker Role template seems to loops endlessly with await Task.Delay(10000);.

I'm not sure how to "mesh" the two properly. Essentially, I want my Worker Role to stay alive and listen for entries into it's subscription forever (or until it quits obviously).

Any guidance would be great!

P.S.: I've asked a related question concerning proper technology I should use for my case scenario at StackExchange - Software Engineering if you are interested.

Update #1 (2018/08/09)

Based on Arunprabhu's answer, here is some code of how I'm sending a Message based on articles I've read and receiving using Visual Studio 2017's Worker Role with Service Bus Queue template.

Sending (based on Get started with Service Bus topics)

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;

namespace TopicsSender {
    internal static class Program {
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private static ITopicClient _topicClient;

        private static void Main(string[] args) {
            MainAsync().GetAwaiter().GetResult();
        }

        private static async Task MainAsync() {
            const int numberOfMessages = 10;
            _topicClient = new TopicClient(ServiceBusConnectionString, TopicName);

            Console.WriteLine("======================================================");
            Console.WriteLine("Press ENTER key to exit after sending all the messages.");
            Console.WriteLine("======================================================");

            // Send messages.
            await SendMessagesAsync(numberOfMessages);

            Console.ReadKey();

            await _topicClient.CloseAsync();
        }

        private static async Task SendMessagesAsync(int numberOfMessagesToSend) {
            try {
                for (var i = 0; i < numberOfMessagesToSend; i++) {
                    // Create a new message to send to the topic
                    var messageBody = $"Message {i}";
                    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

                    // Write the body of the message to the console
                    Console.WriteLine($"Sending message: {messageBody}");

                    // Send the message to the topic
                    await _topicClient.SendAsync(message);
                }
            } catch (Exception exception) {
                Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
            }
        }
    }
}

Receiving (based on Worker Role with Service Bus Queue template)

using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1 {
    public class WorkerRole : RoleEntryPoint {
        // The name of your queue
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private const string SubscriptionName = "test-sub1";

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        private SubscriptionClient _client;
        private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);

        public override void Run() {
            Trace.WriteLine("Starting processing of messages");

            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
            _client.OnMessage((receivedMessage) => {
                try {
                    // Process the message
                    Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
                    var message = receivedMessage.GetBody<byte[]>();
                    Trace.WriteLine($"Received message: SequenceNumber:{receivedMessage.SequenceNumber} Body:{message.ToString()}");
                } catch (Exception e) {
                    // Handle any message processing specific exceptions here
                    Trace.Write(e.ToString());
                }
            });

            _completedEvent.WaitOne();
        }

        public override bool OnStart() {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Initialize the connection to Service Bus Queue
            _client = SubscriptionClient.CreateFromConnectionString(ServiceBusConnectionString, TopicName, SubscriptionName);
            return base.OnStart();
        }

        public override void OnStop() {
            // Close the connection to Service Bus Queue
            _client.Close();
            _completedEvent.Set();
            base.OnStop();
        }
    }
}

Update #2 (2018/08/10)

After a few suggestions from Arunprabhu and knowing I was using different libraries, below is my current solution with pieces taken from several sources. Is there anything I'm overlooking, adding that shouldering be there, etc? Currently getting an error that may be for another question or already answered so don't want to post it yet before further research.

using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1 {
    public class WorkerRole : RoleEntryPoint {
        private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
        private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);

        // The name of your queue
        private const string ServiceBusConnectionString = "<your_connection_string>";
        private const string TopicName = "test-topic";
        private const string SubscriptionName = "test-sub1";

        // _client is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        private SubscriptionClient _client;

        public override void Run() {
            Trace.WriteLine("Starting processing of messages");

            try {
                this.RunAsync(this._cancellationTokenSource.Token).Wait();
            } catch (Exception e) {
                Trace.WriteLine("Exception");
                Trace.WriteLine(e.ToString());
            } finally {
                Trace.WriteLine("Finally...");
                this._runCompleteEvent.Set();
            }
        }

        public override bool OnStart() {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            var result = base.OnStart();

            Trace.WriteLine("WorkerRole has been started");

            return result;
        }

        public override void OnStop() {
            // Close the connection to Service Bus Queue
            this._cancellationTokenSource.Cancel();
            this._runCompleteEvent.WaitOne();

            base.OnStop();
        }

        private async Task RunAsync(CancellationToken cancellationToken) {
            // Configure the client
            RegisterOnMessageHandlerAndReceiveMessages(ServiceBusConnectionString, TopicName, SubscriptionName);

            _runCompleteEvent.WaitOne();

            Trace.WriteLine("Closing");
            await _client.CloseAsync();
        }

        private void RegisterOnMessageHandlerAndReceiveMessages(string connectionString, string topicName, string subscriptionName) {
            _client = new SubscriptionClient(connectionString, topicName, subscriptionName);

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) {
                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
                // Set it according to how many messages the application wants to process in parallel.
                MaxConcurrentCalls = 1,

                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                AutoComplete = false,
            };

            _client.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
        }

        private async Task ProcessMessageAsync(Message message, CancellationToken token) {
            try {
                // Process the message
                Trace.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
                await _client.CompleteAsync(message.SystemProperties.LockToken);
            } catch (Exception e) {
                // Handle any message processing specific exceptions here
                Trace.Write(e.ToString());
                await _client.AbandonAsync(message.SystemProperties.LockToken);
            }
        }

        private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) {
            Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
            Console.WriteLine("Exception context for troubleshooting:");
            Console.WriteLine($"- Endpoint: {context.Endpoint}");
            Console.WriteLine($"- Entity Path: {context.EntityPath}");
            Console.WriteLine($"- Executing Action: {context.Action}");
            return Task.CompletedTask;
        }
    }
}

Solution

  • Considering the complexity of the updated question Update #1 (2018/08/09), I am providing a separate answer.

    The sender and receiver are using different libraries.

    Sender - Microsoft.Azure.ServiceBus

    Receiver - WindowsAzure.ServiceBus

    Microsoft.Azure.ServiceBus has the message object as Message, where WindowsAzure.ServiceBus has BrokeredMessage.

    There is a method RegisterMessageHandler available in Microsoft.Azure.ServiceBus, this is the alternative for client.OnMessage() in WindowsAzure.ServiceBus. By using this, the listener receives the message as Message object. This library supports asynchronous programming as you expect.

    Refer here for samples from both the libraries.