Search code examples
azureazureservicebusazure-servicebus-queues

Azure service bus - processing multiple messages in parallel (MaxConcurrentCallsPerSession)


I'm trying to process multiple messages, from a queue with sessions enabled, in parallel. I've tried setting MaxConcurrentCallsPerSession to 5 for example but I am still receiving 1 message at a time.

I wrote a console application to demonstrate what I'm trying to do:

static void Main()
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            //create the queue
            await CreateQueue();

            //initialize queue client
            ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
            {
                TransportType = ServiceBusTransportType.AmqpWebSockets,
            });

            //initialize the sender
            ServiceBusSender sender = queueClient.CreateSender(_queueName);

            //queue 3 messages
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "1" });
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "2" });
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "3" });

            //initialize processor
            ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
            {
                AutoCompleteMessages = false,
                ReceiveMode = ServiceBusReceiveMode.PeekLock,
                SessionIds = { _sessionId },
                PrefetchCount = 5,
                MaxConcurrentCallsPerSession = 5
            });

            //add message handler
            processor.ProcessMessageAsync += HandleReceivedMessage;

            //add error handler
            processor.ProcessErrorAsync += ErrorHandler;

            //start the processor
            await processor.StartProcessingAsync();

            Console.ReadLine();
        }

        static async Task CreateQueue()
        {
            ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);

            bool doesQueueExist = await client.QueueExistsAsync(_queueName);

            //check if the queue exists, if not then create one
            if (!doesQueueExist)
            {
                _ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
                {
                    RequiresSession = true,
                    DeadLetteringOnMessageExpiration = true,
                    MaxDeliveryCount = 3,
                    EnableBatchedOperations = true,
                });
            }
        }

        static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
        {
            Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);

            await Task.Delay(5000).ConfigureAwait(false);

            await sessionMessage.CompleteMessageAsync(sessionMessage.Message);

            Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
        }

        static Task ErrorHandler(ProcessErrorEventArgs e)
        {
            Console.WriteLine("Error received");

            return Task.CompletedTask;
        }

When executing the program, what I expect to receive is:

Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3

But what I am getting is:

Received message: 1
Completed message: 1
Received message: 2
Completed message: 2
Received message: 3
Completed message: 3

Is what I am trying to achieve possible please?

I am using .NetFramework 4.7.2 and Azure.Messaging.ServiceBus 7.17.4


Solution

  • I wrote a similar question on the library’s GitHub repository to clarify how it works since the documentation doesn’t explicitly mention that queues with sessions don’t support parallel message processing. In response, I received confirmation that this is indeed a bug in the library, and a fix will be rolled out soon.

    Link to the question on Github