So I've recently gotten the need to use Service Bus Topic and Subscriptions
and I've followed many articles and tutorials. I've been able to successfully implement Microsoft's Get started with Service Bus topics and also successfully used Visual Studio 2017's
Worker Role
template to access a database.
However, I'm confused as to how to properly "combine" the two. While the Get started with Service Bus topics article shows how to create 2 apps, one to send and one to receive and then quits, the Worker Role
template seems to loops endlessly with await Task.Delay(10000);
.
I'm not sure how to "mesh" the two properly. Essentially, I want my Worker Role
to stay alive and listen for entries into it's subscription forever (or until it quits obviously).
Any guidance would be great!
P.S.: I've asked a related question concerning proper technology I should use for my case scenario at StackExchange - Software Engineering if you are interested.
Update #1 (2018/08/09)
Based on Arunprabhu's answer, here is some code of how I'm sending a Message
based on articles I've read and receiving using Visual Studio 2017
's Worker Role with Service Bus Queue
template.
Sending (based on Get started with Service Bus topics)
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
namespace TopicsSender {
internal static class Program {
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private static ITopicClient _topicClient;
private static void Main(string[] args) {
MainAsync().GetAwaiter().GetResult();
}
private static async Task MainAsync() {
const int numberOfMessages = 10;
_topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
Console.WriteLine("======================================================");
// Send messages.
await SendMessagesAsync(numberOfMessages);
Console.ReadKey();
await _topicClient.CloseAsync();
}
private static async Task SendMessagesAsync(int numberOfMessagesToSend) {
try {
for (var i = 0; i < numberOfMessagesToSend; i++) {
// Create a new message to send to the topic
var messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
// Write the body of the message to the console
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the topic
await _topicClient.SendAsync(message);
}
} catch (Exception exception) {
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}
}
}
Receiving (based on Worker Role with Service Bus Queue
template)
using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// QueueClient is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
public override void Run() {
Trace.WriteLine("Starting processing of messages");
// Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
_client.OnMessage((receivedMessage) => {
try {
// Process the message
Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
var message = receivedMessage.GetBody<byte[]>();
Trace.WriteLine($"Received message: SequenceNumber:{receivedMessage.SequenceNumber} Body:{message.ToString()}");
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
}
});
_completedEvent.WaitOne();
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// Initialize the connection to Service Bus Queue
_client = SubscriptionClient.CreateFromConnectionString(ServiceBusConnectionString, TopicName, SubscriptionName);
return base.OnStart();
}
public override void OnStop() {
// Close the connection to Service Bus Queue
_client.Close();
_completedEvent.Set();
base.OnStop();
}
}
}
Update #2 (2018/08/10)
After a few suggestions from Arunprabhu and knowing I was using different libraries, below is my current solution with pieces taken from several sources. Is there anything I'm overlooking, adding that shouldering be there, etc? Currently getting an error that may be for another question or already answered so don't want to post it yet before further research.
using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// _client is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
public override void Run() {
Trace.WriteLine("Starting processing of messages");
try {
this.RunAsync(this._cancellationTokenSource.Token).Wait();
} catch (Exception e) {
Trace.WriteLine("Exception");
Trace.WriteLine(e.ToString());
} finally {
Trace.WriteLine("Finally...");
this._runCompleteEvent.Set();
}
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
var result = base.OnStart();
Trace.WriteLine("WorkerRole has been started");
return result;
}
public override void OnStop() {
// Close the connection to Service Bus Queue
this._cancellationTokenSource.Cancel();
this._runCompleteEvent.WaitOne();
base.OnStop();
}
private async Task RunAsync(CancellationToken cancellationToken) {
// Configure the client
RegisterOnMessageHandlerAndReceiveMessages(ServiceBusConnectionString, TopicName, SubscriptionName);
_runCompleteEvent.WaitOne();
Trace.WriteLine("Closing");
await _client.CloseAsync();
}
private void RegisterOnMessageHandlerAndReceiveMessages(string connectionString, string topicName, string subscriptionName) {
_client = new SubscriptionClient(connectionString, topicName, subscriptionName);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) {
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
AutoComplete = false,
};
_client.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
}
private async Task ProcessMessageAsync(Message message, CancellationToken token) {
try {
// Process the message
Trace.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await _client.CompleteAsync(message.SystemProperties.LockToken);
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
await _client.AbandonAsync(message.SystemProperties.LockToken);
}
}
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) {
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
Considering the complexity of the updated question Update #1 (2018/08/09), I am providing a separate answer.
The sender and receiver are using different libraries.
Sender - Microsoft.Azure.ServiceBus
Receiver - WindowsAzure.ServiceBus
Microsoft.Azure.ServiceBus has the message object as Message, where WindowsAzure.ServiceBus has BrokeredMessage.
There is a method RegisterMessageHandler available in Microsoft.Azure.ServiceBus, this is the alternative for client.OnMessage() in WindowsAzure.ServiceBus. By using this, the listener receives the message as Message object. This library supports asynchronous programming as you expect.
Refer here for samples from both the libraries.