I've got a queue-triggered Azure webjob function:
public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
// Do normal processing...
}
I want to selectively ignore certain queue items based on a condition. For example if I'm out of Widget X and I don't want to keep processing widget shipments until I have stock. So I want to ignore/skip all Widget X queue items temporarily. So something like:
public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
if (!HaveStock(WidgetX))
// Ignore queue item - treat it as if it was not in the queue at all
// Do normal processing...
}
Throwing an exception to fail processing doesn't seem like an option to me because it'll poison the message.
I could set the VisibilityTimeout to some arbitrarily high value and then throw an exception but then when I wanted to perform the processing (as opposed to just waiting for the timeout) I'd have to have a separate process scan for queue items with the higher VisibilityTimeout and set the timeout to a lower value (not even sure if this is possible). But making a process to scan those queue items is what I'm trying to avoid in the first place as I'd like to keep the logic in the queue-triggered function if possible.
What options do I have?
Update: I recognize that what I'm trying to accomplish is inconsistent with the purpose of queue-triggered functions in the first place given that the reason for queue-triggered events is to do something when a message appears, and that if I want selective processing I could implement a queue-polling and message handling process myself. But I'm trying to take advantage of the higher level of abstraction that triggered processing provides.
I would like to say; Let Queue be Queue, and let FIFO be FIFO.
If you want to handle conditional message processing with queue, I suggest two options.
1. Use Azure Service Bus Topics
For selective FIFO filter, replacing Azure Queue with Azure Service Bus Topic can be a candidate.
See the tutorial to get a hint: https://azure.microsoft.com/en-us/documentation/articles/websites-dotnet-webjobs-sdk-service-bus/#topics
2. Use secondary Queue
Similar to poisoned-queue, send ignored one to secondary Queue and process them later in another logic. I recommend this as we can understand flows easily.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
_secondaryQueue= queueClient.GetQueueReference(storageQueueName);
_secondaryQueue.CreateIfNotExists();
public static void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
// send to secondary queue.
await _secondaryQueue.AddMessageAsync(new CloudQueueMessage(JsonConvert.SerializeObject(item)));
}