Because I need to remove duplicate messages and delay processing some messages which are "too new" (as determined by the most recent copy of a message), I want to handle the entire contents of Service Bus Queue all at once.
I'm not sure how many messages I can expect, but I feel strongly optimistic that it shouldn't normally be in the hundreds, never mind that thousands which I thought could be the limit for ReceiveAsync (int maxMessageCount, TimeSpan operationTimeout)
. However, it turns out that no matter how high that value is, I'm only able to read about 30 to 50 messages in a single invocation of
private async Task<IList<MicrosoftMessage>> Receive(IQueueConfig queueConfig) =>
await _messageReceiverLookup.GetMessageReceiver(queueConfig.QueueName)
.ReceiveAsync(queueConfig.MaximumRecords, TimeSpan.FromSeconds(10));
I tried wrapping this with some additional logic, like:
List<MicrosoftMessage> messages = new();
List<MicrosoftMessage> newMessages = new();
do
{
newMessages = await ReceiveMessages(queueHandler, cancellationToken);
messages.AddRange(newMessages);
}
while (
newMessages.Count > 0
&& messages.Count > 0
&& messages.Count < queueHandler.QueueConfig.MaximumRecords
);
But discovered this never ends because the system will read the same message multiple times.
So then I tried this:
Dictionary<string, MicrosoftMessage> previosMessagesByToken;
Dictionary<string, MicrosoftMessage> allMessagesByToken = new();
List<MicrosoftMessage> newMessages;
do
{
previosMessagesByToken = allMessagesByToken;
newMessages = await ReceiveMessages(queueHandler, cancellationToken);
Dictionary<string, MicrosoftMessage> newMessagesByToken = newMessages.ToDictionary(x => x.SystemProperties.LockToken, x => x);
// Ensure we only collect each message once!
allMessagesByToken = allMessagesByToken.Concat(newMessagesByToken.Where(kvp => !allMessagesByToken.ContainsKey(kvp.Key)))
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
}
while (
newMessages.Count > 0
&& allMessagesByToken.Count > previosMessagesByToken.Count
&& allMessagesByToken.Count < queueHandler.QueueConfig.MaximumRecords
);
This seems to work, but on the one hand, I have a gut feeling this should not be so complicated. Also, I don't entirely trust this since I do not entirely understand why I don't get all the messages nor why I get duplicate messages, so I can't help feeling that somehow this algorithm could allow some messages to fall between the cracks, being a non-duplicate that is not included.
Is there better way I can make get all the messages?
A few foundational assumptions:
PeekLock
mode will have their lock expired at some point and will be delivered.If your objective to drain all the messages, you should either complete the messages you've received or receive in ReceiveAndDelete
mode. That way you won't get the same messages again. If you trying to peek at the messages in the queue, then your LockDuration
would need to be long enough to ensure all messages have peeked.
I need to remove duplicate messages and delay processing some messages which are "too new" (as determined by the most recent copy of a message), I want to handle the entire contents of Service Bus Queue all at once.
The bigger issue seems to be the attempt to handle messages in the queue like records in a database. Duplicate detection is already a feature of Azure Service Bus. Deferring messages as well. But it would require a different approach than batch processing.