Search code examples
c#azureazure-functionsazure-queues

Azure storage queue messages are moved all at once to poison queue after a period of time


Repro steps

Add messages to the queue (around ~500 messages). Add a queue triggered function app which will process the queue messages.

Expected behavior

All the messages from the queue are processed. If an exception is thrown while processing a message, the message is sent to poison queue.

Actual behavior

Messages start being processed. After a period of time (somewhere around 30-60 seconds) all the messages that are left in the queue are moved to the poison queue. No exception is thrown with the messages that are processed in that moment.

We checked that a queue message lifetime is the default one: 7 days. We tried downgrading WindowsAzure.Storage package to 7.2.1 but without success.

WindowsAzure.Storage version we are using is 8.5.0.

Could it be that the function app crashes and this will cause all the messages to be moved to the poison queue?

EDIT

This is how messages are added to the queue:

public async Task AddMessage()
{
    var queueModel = new QueueModel
    {
        // queue model properties
    };

    await AddMessageToQueueAsync(queueModel);
}


public async Task AddMessageToQueueAsync(T messageObject, TimeSpan? initialVisibilityDelay = null)
{
    var queue = GetQueue();
    var jsonMessage = JsonConvert.SerializeObject(messageObject);
    var message = new CloudQueueMessage(jsonMessage);

    await queue.AddMessageAsync(message, TimeSpan.FromDays(7), initialVisibilityDelay, new QueueRequestOptions(), new OperationContext());
}

private CloudQueue GetQueue()
{
    var queueClient = _storageAccount.CreateCloudQueueClient();

    var queue = queueClient.GetQueueReference(_queueName);
    queue.CreateIfNotExists();

    return queue;
}

And this is the function app which processes the messages:

[FunctionName("ProcessQueue")]
public static async Task Run([QueueTrigger("queue-name", Connection = "AzureWebJobsStorage")]string queueItem, TraceWriter log)
{
    if (String.IsNullOrEmpty(queueItem))
    {
        return;
    }

    var queueModel = JsonConvert.DeserializeObject<QueueModel>(queueItem);
    if (queueModel == null)
    {
        return;
    }

    try
    {
        // process the message
    }
    catch (Exception ex)
    {
        // message is moved to the poison-queue but no exception is thrown
        log.Error(ex.Message, ex);
    }
}

Solution

  • Found the issue. The problem was a leftover function app which listened to the same queue and processed the messages in parallel with the current function app. Because both function apps processed messages from the same queue, at some point, the messages left in the queue were all moved to the poison queue. (Maybe some concurrency issues when reading messages from the queue was the problem).