Search code examples
c#azureazure-worker-rolesazureservicebusazure-servicebus-queues

Using QueueClient.OnMessage in an azure worker role


I have an Azure worker role that is responsible for checking 4 service bus queues. Currently, I just the looping method to manually check the queues.

while(true)
{
    //loop through my queues to check for messages
}

With the Azure SDK 2.0 came the ability to listen for messages rather than polling for them. But Every example I've seen uses a console app with Console.ReadKey(). Is there a way to have the worker role sit and wait on messages too?

I tried:

public override void Run()
{
    _queueProcessors.ForEach(x => x.OnMessage(Process);
}

where _queueProcessors is a list of QueueClients and Process is a private method that handles the messages. However, the worker role would register them and then restart.

So anyone know how to make a queue client sit and wait on a message?


Solution

  • Following is a code sample for this:

    using Microsoft.ServiceBus;
    using Microsoft.ServiceBus.Messaging;
    using Microsoft.WindowsAzure.ServiceRuntime;
    using System.Diagnostics;
    using System.Net;
    using System.Threading;
    
    namespace WorkerRoleWithSBQueue1
    {
        public class WorkerRole : RoleEntryPoint
        {
            // The name of your queue
            const string QueueName = "demoapp";
            ManualResetEvent CompletedEvent = new ManualResetEvent(false);
    
            // QueueClient is thread-safe. Recommended that you cache 
            // rather than recreating it on every request
            QueueClient Client;
    
            public override void Run()
            {
                OnMessageOptions options = new OnMessageOptions();
                options.AutoComplete = true; // Indicates if the message-pump should call complete on messages after the callback has completed processing.
                options.MaxConcurrentCalls = 1; // Indicates the maximum number of concurrent calls to the callback the pump should initiate 
                options.ExceptionReceived += LogErrors; // Allows users to get notified of any errors encountered by the message pump
    
                Trace.WriteLine("Starting processing of messages");
                // Start receiveing messages
                Client.OnMessage((receivedMessage) => // Initiates the message pump and callback is invoked for each message that is recieved, calling close on the client will stop the pump.
                    {
                        try
                        {
                            // Process the message
                            Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
                        }
                        catch
                        {
                            // Handle any message processing specific exceptions here
                        }
                    }, options);
    
                CompletedEvent.WaitOne();
            }
    
            private void LogErrors(object sender, ExceptionReceivedEventArgs e)
            {
                if (e.Exception != null)
                {
                    Trace.WriteLine("Error: " + e.Exception.Message);
                }
            }
    
            public override bool OnStart()
            {
                // Set the maximum number of concurrent connections 
                ServicePointManager.DefaultConnectionLimit = 12;
    
                // Create the queue if it does not exist already
                Trace.WriteLine("Creating Queue");
                string connectionString = "*** provide your connection string here***";
                var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
                if (!namespaceManager.QueueExists(QueueName))
                {
                    namespaceManager.CreateQueue(QueueName);
                }
    
                // Initialize the connection to Service Bus Queue
                Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
            
                Trace.WriteLine("Sending messages...");
                // populate some messages
                for (int ctr = 0; ctr < 10; ctr++)
                {
                    Client.Send(new BrokeredMessage());
                }
            
                return base.OnStart();
            }
    
            public override void OnStop()
            {
                // Close the connection to Service Bus Queue
                Client.Close();
                CompletedEvent.Set(); // complete the Run function
                base.OnStop();
            }
        }
    }