Search code examples
azureazure-webjobsazure-webjobssdkazure-queues

How can I disable Azure webjob storage queue processing for specific queue items?


I've got a queue-triggered Azure webjob function:

public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
    {
       // Do normal processing...
    }

I want to selectively ignore certain queue items based on a condition. For example if I'm out of Widget X and I don't want to keep processing widget shipments until I have stock. So I want to ignore/skip all Widget X queue items temporarily. So something like:

public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
    if (!HaveStock(WidgetX))
       // Ignore queue item - treat it as if it was not in the queue at all

    // Do normal processing...
}

Throwing an exception to fail processing doesn't seem like an option to me because it'll poison the message.

I could set the VisibilityTimeout to some arbitrarily high value and then throw an exception but then when I wanted to perform the processing (as opposed to just waiting for the timeout) I'd have to have a separate process scan for queue items with the higher VisibilityTimeout and set the timeout to a lower value (not even sure if this is possible). But making a process to scan those queue items is what I'm trying to avoid in the first place as I'd like to keep the logic in the queue-triggered function if possible.

What options do I have?

Update: I recognize that what I'm trying to accomplish is inconsistent with the purpose of queue-triggered functions in the first place given that the reason for queue-triggered events is to do something when a message appears, and that if I want selective processing I could implement a queue-polling and message handling process myself. But I'm trying to take advantage of the higher level of abstraction that triggered processing provides.


Solution

  • I would like to say; Let Queue be Queue, and let FIFO be FIFO.

    If you want to handle conditional message processing with queue, I suggest two options.

    1. Use Azure Service Bus Topics

    For selective FIFO filter, replacing Azure Queue with Azure Service Bus Topic can be a candidate.

    See the tutorial to get a hint: https://azure.microsoft.com/en-us/documentation/articles/websites-dotnet-webjobs-sdk-service-bus/#topics

    2. Use secondary Queue

    Similar to poisoned-queue, send ignored one to secondary Queue and process them later in another logic. I recommend this as we can understand flows easily.

    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    _secondaryQueue= queueClient.GetQueueReference(storageQueueName);
    _secondaryQueue.CreateIfNotExists();
    
    public static void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
    {
        // send to secondary queue.
        await _secondaryQueue.AddMessageAsync(new CloudQueueMessage(JsonConvert.SerializeObject(item)));
    }