Search code examples
azure.net-coremessagingservicebusazure-servicebus-queues

Suggestions for Implementing Session Block for Service Bus


Currently, we have a dotnet core app with a background service that receives message from Service Bus with session enabled, where sessionId is the userId, and the message contains updates to user info. Now we would like to implement a function to temporarily pause updates to a specific user by blocking a specific userId/sessionId but still process the messages in order when unblocked. What would be the best way to approach this problem?

I've tried to look through the service bus documentation and samples. Mainly, message deferral, message session state and session state sample

And I've found some information on SessionState and Message deferral, and I wonder if they can be used to implement this feature and still guarantee processing order (FIFO regardless of whether the message was deferred). I was thinking of trying to store the sequence number in the session state and continue to receive deferred messages through that number and increment it to receive the next message until I run out of messages?

Currently, our code looks something like this:

            this.queue.RegisterSessionHandler(
                this.SessionHandler,
                new SessionHandlerOptions(this.ExceptionHandler)
                {
                    AutoComplete = false,
                    MessageWaitTimeout = TimeSpan.FromMinutes(1),
                });

Where this.SessionHandler is a function that processes the message then complete and close the session by calling session.CompleteAsync and session.CloseAsync. However, I'm having trouble conceptualizing how to add the deferral logic to our code. Because currently, RegisterSessionHandler already handles the session-locks and does load balancing on messages with sessionId (I assume), which is great. But RegisterSessionHandler also doesn't let you specify a particular sessionId to process.

Say I have a few messages deferred for userId/sessionId: A. And when I want to unblock the processing for this user I cannot simply insert the deferred message back to the queue. As the sender would still constantly send messages for user A to the queue and it would mess up the order.

The session state sample I've mentioned above has a good example on using session state and processing of deferred messages. However, it only uses one sessionId and doesn't use RegisterSessionHandler. My question is: if we want to implement deferred message processing logic(with order preserved) do we have to implement our own RegisterSessionHandler and deal with sessionId load balancing?

Thanks in advance!


Solution

  • You should use SessionClient instead of using RegisterSessionHandler in QueueClient to better handle the deferring scenario and preserve order. You can maintain some step/sequence number in your message body. Also add LastProcessedStep/Seqence whenevr you actually process a message. Session state allows keeping track of the processing state a handler has related to a session, so that clients can be agile between processing nodes (including failover) during session processing. The sample handles the deferred message by maintaining that (Step). It combines the Deferral and Session features such that the session state facility is being used to keep track of the procesing state of a workflow where input for the respective steps arrives out of the expected order. Notice the sender code also which demonstrates that by sending message in unpredictable order, but by virtue of session state, receiver detects ordering.

    //   
    //   Copyright © Microsoft Corporation, All Rights Reserved
    // 
    //   Licensed under the Apache License, Version 2.0 (the "License"); 
    //   you may not use this file except in compliance with the License. 
    //   You may obtain a copy of the License at
    // 
    //   http://www.apache.org/licenses/LICENSE-2.0 
    // 
    //   THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
    //   OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION
    //   ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A
    //   PARTICULAR PURPOSE, MERCHANTABILITY OR NON-INFRINGEMENT.
    // 
    //   See the Apache License, Version 2.0 for the specific language
    //   governing permissions and limitations under the License. 
    
    namespace SessionState
    {
        using Microsoft.Azure.ServiceBus;
        using Microsoft.Azure.ServiceBus.Core;
        using Newtonsoft.Json;
        using System;
        using System.Collections.Generic;
        using System.Text;
        using System.Threading.Tasks;
    
        public class Program : MessagingSamples.Sample
        {
            public async Task Run(string connectionString)
            {
                Console.WriteLine("Press any key to exit the scenario");
    
                var sendTask = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
                var sendTask2 = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
                var receiveTask = this.ReceiveMessagesAsync(connectionString, SessionQueueName);
    
                await Task.WhenAll(sendTask, sendTask2, receiveTask);
            }
    
            async Task SendMessagesAsync(string session, string connectionString, string queueName)
            {
                var sender = new MessageSender(connectionString, queueName);
    
    
                Console.WriteLine("Sending messages to Queue...");
    
                ProcessingState[] data = new[]
                {
                    new ProcessingState {Step = 1, Title = "Buy"},
                    new ProcessingState {Step = 2, Title = "Unpack"},
                    new ProcessingState {Step = 3, Title = "Prepare"},
                    new ProcessingState {Step = 4, Title = "Cook"},
                    new ProcessingState {Step = 5, Title = "Eat"},
                };
    
                var rnd = new Random();
                var tasks = new List<Task>();
                for (int i = 0; i < data.Length; i++)
                {
                    var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
                    {
                        SessionId = session,
                        ContentType = "application/json",
                        Label = "RecipeStep",
                        MessageId = i.ToString(),
                        TimeToLive = TimeSpan.FromMinutes(2)
                    };
    
                    tasks.Add(Task.Delay(rnd.Next(30)).ContinueWith(
                          async (t) =>
                          {
                              await sender.SendAsync(message);
                              lock (Console.Out)
                              {
                                  Console.ForegroundColor = ConsoleColor.Yellow;
                                  Console.WriteLine("Message sent: Id = {0}", message.MessageId);
                                  Console.ResetColor();
                              }
                          }));
                }
                await Task.WhenAll(tasks);
            }
    
            async Task ReceiveMessagesAsync(string connectionString, string queueName)
            {
                var client = new SessionClient(connectionString, queueName, ReceiveMode.PeekLock);
    
                while (true)
                {
                    var session = await client.AcceptMessageSessionAsync();
                    await Task.Run(
                        async () =>
                        {
                            ProcessingState processingState;
    
                            var stateData = await session.GetStateAsync();
                            if (stateData != null)
                            {
                                processingState = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(stateData));
                            }
                            else
                            {
                                processingState = new ProcessingState
                                {
                                    LastProcessedRecipeStep = 0,
                                    DeferredSteps = new Dictionary<int, long>()
                                };
                            }
    
                            while (true)
                            {
                                try
                                {
                                    //receive messages from Queue
                                    var message = await session.ReceiveAsync(TimeSpan.FromSeconds(5));
                                    if (message != null)
                                    {
                                        if (message.Label != null &&
                                            message.ContentType != null &&
                                            message.Label.Equals("RecipeStep", StringComparison.InvariantCultureIgnoreCase) &&
                                            message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
                                        {
                                            var body = message.Body;
    
                                            ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                            if (recipeStep.Step == processingState.LastProcessedRecipeStep + 1)
                                            {
                                                lock (Console.Out)
                                                {
                                                    Console.ForegroundColor = ConsoleColor.Cyan;
                                                    Console.WriteLine(
                                                        "\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
                                                        "\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4},  \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
                                                        message.MessageId,
                                                        message.SystemProperties.SequenceNumber,
                                                        message.SystemProperties.EnqueuedTimeUtc,
                                                        message.ContentType,
                                                        message.Size,
                                                        message.ExpiresAtUtc,
                                                        recipeStep.Step,
                                                        recipeStep.Title);
                                                    Console.ResetColor();
                                                }
                                                await session.CompleteAsync(message.SystemProperties.LockToken);
                                                processingState.LastProcessedRecipeStep = recipeStep.Step;
                                                await
                                                    session.SetStateAsync(
                                                        Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                            }
                                            else
                                            {
    // in your case, if customer update is blocked, you can defer
                                                processingState.DeferredSteps.Add((int)recipeStep.Step, (long)message.SystemProperties.SequenceNumber);
                                                await session.DeferAsync(message.SystemProperties.LockToken);
                                                await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                            }
                                        }
                                        else
                                        {
                                            await session.DeadLetterAsync(message.SystemProperties.LockToken);//, "ProcessingError", "Don't know what to do with this message");
                                        }
                                    }
                                    else
                                    {
                                        while (processingState.DeferredSteps.Count > 0)
                                        {
                                            long step;
    
                                            if (processingState.DeferredSteps.TryGetValue(processingState.LastProcessedRecipeStep + 1, out step))
                                            {
                                                var deferredMessage = await session.ReceiveDeferredMessageAsync(step);
                                                var body = deferredMessage.Body;
                                                ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                                lock (Console.Out)
                                                {
                                                    Console.ForegroundColor = ConsoleColor.Cyan;
                                                    Console.WriteLine(
                                                        "\t\t\t\tdeferredMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
                                                        "\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4},  \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
                                                        deferredMessage.MessageId,
                                                        deferredMessage.SystemProperties.SequenceNumber,
                                                        deferredMessage.SystemProperties.EnqueuedTimeUtc,
                                                        deferredMessage.ContentType,
                                                        deferredMessage.Size,
                                                        deferredMessage.ExpiresAtUtc,
                                                        recipeStep.Step,
                                                        recipeStep.Title);
                                                    Console.ResetColor();
                                                }
                                                await session.CompleteAsync(deferredMessage.SystemProperties.LockToken);
                                                processingState.LastProcessedRecipeStep = processingState.LastProcessedRecipeStep + 1;
                                                processingState.DeferredSteps.Remove(processingState.LastProcessedRecipeStep);
                                                await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                            }
                                        }
                                        break;
                                    }
                                }
                                catch (ServiceBusException e)
                                {
                                    if (!e.IsTransient)
                                    {
                                        Console.WriteLine(e.Message);
                                        throw;
                                    }
                                }
                            }
                            await session.CloseAsync();
                        });
                }
            }
    
           public static int Main(string[] args)
            {
                try
                {
                    var app = new Program();
                    app.RunSample(args, app.Run);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.ToString());
                    return 1;
                }
                return 0;
            }
    
            class ProcessingState
            {
                [JsonProperty]
                public int LastProcessedRecipeStep { get; set; }
                [JsonProperty]
                public Dictionary<int, long> DeferredSteps { get; set; }
                [JsonProperty]
                public int Step { get; internal set; }
                [JsonProperty]
                public string Title { get; internal set; }
            }
        }
    }
    

    You can also follow Ordering Messages in Azure Service Bus which explains the concept very well briefly. But the sample provided there is slightly different than above.

    WARNING: Using message session also means for a session (for your case a user id), the messages in that session will be received and processed always by a single receiver. So, be mindful when you set the Session and SessionId. If you create a very large session, that will force Azure Service Bus to send most messages to one Subscriber, reducing the multi-threading benefits. If you set sessions too granular, then it loses its intended benefit, and you are simply adding unnecessary overhead.