Search code examples
azureazureservicebusazure-servicebus-queues

Azure Service Bus Queues: How To Read Individual Messages From A Queue


I have an application where I am able to write a message to an Azure Service Bus Queue in a JSON format in one part of my process. I have a downstream process that I'd like to then pop that message off the queue, translate the json to an object and then process that object.

I have no problem pushing messages onto the queue but I have not been able to find any examples of popping a message off of the queue one at a time or in a loop. EVERY example that I've seen from Microsoft or on Github is a console application (useless in a web application) that sets up some kind of listener that grabs all the messages in a queue and writes a console message. There are no examples I've found where the message is popped and then some processing is done with the data. Does anyone have examples on how to pop a message off of the queue and then process it or call another method to do something with the data in the message?

Update: I used the WindowsAzure.ServiceBus example below supplied by Guru Pasupathy and ended using the following snippet from Azure Service Bus Queues: How To Read Individual Messages From A Queue to get the text of the message from the BrokeredMessage object:

Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string messageBody =  reader.ReadToEnd();

I could then take messageBody and deserialize the embedded JSON into a POCO object and I was on my way! Now I can use queues more effectively in my application for a variety of tasks.


Solution

  • You can use the Peek Lock receive mode to get message from queue, process it and then you have choice to either Abandon it or Complete it based on your business logic.

    If you are using WindowsAzure.ServiceBus nuget package to send / receive the messages the below method can be used to consume the message from queue one by one based on a loop or single call without having to use a listener.

            public void Receive()
            {
                QueueClient myQueueClient = QueueClient.CreateFromConnectionString("<connectionString>;<queueName>", ReceiveMode.PeekLock);
    
                int someCount = 2; //some random value for testing
                try
                {
                    for (int i = 0; i < someCount; i++)
                    {
                        BrokeredMessage message = myQueueClient.Receive();
                        Console.WriteLine("The message is " + message);
                        message.Complete();
                    }
                }
                catch(Exception e)
                {
                    //Handle your expection
                }
    
            }
    

    If you are using Microsoft.Azure.Service nuget package then I could not find a direct way to read only a single message without using a listener. I see that the listener will keep polling and processing the queue till there are no more message.

    If your ask is to stop the polling and continue fetching all the available messages, then as a workaround you may close the QueueClient instance after reading one single message and open qc and register handler when your process is ready to take the next one.

            public async Task ProcessMessagesAsync(Message message, CancellationToken token)
            {
                Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");
    
                BinaryFormatter bf = new BinaryFormatter();
                using (MemoryStream ms = new MemoryStream(message.Body))
                {
                    Payload payload = (Payload) bf.Deserialize(ms);
    
                    //Based on your needs you may have a condition here based on which you could Abandon or Complete the mesage
    
                    await qc.CompleteAsync(message.SystemProperties.LockToken);
                    Console.WriteLine("Completed the message --> " + payload.Message + " -- Id --> " + payload.Id);
    
                    //await qc.AbandonAsync(message.SystemProperties.LockToken);
                    //Console.WriteLine("Abandon the message --> " + payload.Message + " -- Id --> " + payload.Id);
    
                    qc.CloseAsync(); //If you close the QueueClient instance here, no more messages will be picked up from queue.
                }
            }
    

    The above example is based on https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues#receive-messages-from-the-queue , I just added the qc.CloseAsync() call at the end. Without this line, the listener will continue processing till there are no more message left in the queue. I am not sure if there are better ways to achieve this, but thought of sharing.

    Hope this helps

    EDIT -

    When sending the message, if you are using a custom type you can use the below

                    BrokeredMessage message = new BrokeredMessage(new Payload() { Id = 4332, Message = "WindowsAzure package" });
                    myQueueClient.Send(message);
    

    and while receiving you should use the GetBody as shown below

                    BrokeredMessage message = myQueueClient.Receive();
                    var incoming = message.GetBody<Payload>();
                    Console.WriteLine("The message is " + incoming.Id + " and " + incoming.Message);
                    message.Complete();
    

    Below is the custom object for your reference

        [Serializable]
        public class Payload
        {
            public int Id { get; set; }
            public string Message { get; set; }
        }
    

    Same applies to string also

    While receiving you can use

    var incoming = message.GetBody<string>();
    

    While sending you can send as

    BrokeredMessage message = new BrokeredMessage("WindowsAzure package" );
    

    You get more details on different content formatting on the below link https://abhishekrlal.com/2012/03/30/formatting-the-content-for-service-bus-messages/