I'm trying to process multiple messages, from a queue with sessions enabled, in parallel. I've tried setting MaxConcurrentCallsPerSession to 5 for example but I am still receiving 1 message at a time.
I wrote a console application to demonstrate what I'm trying to do:
static void Main()
{
MainAsync().Wait();
}
static async Task MainAsync()
{
//create the queue
await CreateQueue();
//initialize queue client
ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
});
//initialize the sender
ServiceBusSender sender = queueClient.CreateSender(_queueName);
//queue 3 messages
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "1" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "2" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "3" });
//initialize processor
ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
{
AutoCompleteMessages = false,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
SessionIds = { _sessionId },
PrefetchCount = 5,
MaxConcurrentCallsPerSession = 5
});
//add message handler
processor.ProcessMessageAsync += HandleReceivedMessage;
//add error handler
processor.ProcessErrorAsync += ErrorHandler;
//start the processor
await processor.StartProcessingAsync();
Console.ReadLine();
}
static async Task CreateQueue()
{
ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);
bool doesQueueExist = await client.QueueExistsAsync(_queueName);
//check if the queue exists, if not then create one
if (!doesQueueExist)
{
_ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
{
RequiresSession = true,
DeadLetteringOnMessageExpiration = true,
MaxDeliveryCount = 3,
EnableBatchedOperations = true,
});
}
}
static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
{
Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);
await Task.Delay(5000).ConfigureAwait(false);
await sessionMessage.CompleteMessageAsync(sessionMessage.Message);
Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
}
static Task ErrorHandler(ProcessErrorEventArgs e)
{
Console.WriteLine("Error received");
return Task.CompletedTask;
}
When executing the program, what I expect to receive is:
Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3
But what I am getting is:
Received message: 1
Completed message: 1
Received message: 2
Completed message: 2
Received message: 3
Completed message: 3
Is what I am trying to achieve possible please?
I am using .NetFramework 4.7.2 and Azure.Messaging.ServiceBus 7.17.4
I wrote a similar question on the library’s GitHub repository to clarify how it works since the documentation doesn’t explicitly mention that queues with sessions don’t support parallel message processing. In response, I received confirmation that this is indeed a bug in the library, and a fix will be rolled out soon.