Search code examples
azureazure-functionsazureservicebusazure-durable-functions

Queue Trigger messages for Durable Functions delivered multiple times


I have a durable function invoked by a queue trigger

This durable function takes time to run, and I think this is then meaning that the message is being resent from the queue to the function multiple times

public async Task RunOrchestratorAsync(
[ServiceBusTrigger("my-queue", Connection = "event-bus-connection")] string queueItem,
            [DurableClient] IDurableOrchestrationClient starter)
{
    var dto = JsonConvert.DeserializeObject<MyObject>(queueItem);
    await starter.StartNewAsync("OrchestratorFunction", dto);
}

enter image description here

As you can see, there are multiple deliveries for the messages

How can I stop this?

Paul


Solution

  • I agree with @Stephen Cleary

    If the send message is not processed by function and function is exited by some error or issue. When next time function runs successfully it will process all the unprocessed messages.

    I also created a durable function and trigger is service bus queue trigger, and used your code which you provided in the Question.

    function1.cs:

        using System.Collections.Generic;
        using System.Threading.Tasks;
        using Microsoft.Azure.WebJobs;
        using Microsoft.Azure.WebJobs.Extensions.DurableTask;
        using Microsoft.Extensions.Logging;
        using Newtonsoft.Json;
        using Newtonsoft.Json.Linq;
        using System;
        
        
        namespace FunctionApp1
        {
            public class MyObject
            { 
                public string Id { get; set; }
            }
            public static class Function1
            {
                [FunctionName("QueueTrigger")]
                public static void Run(
                    [ServiceBusTrigger("test", Connection = "connectionstring")] string myQueueItem,
                    [DurableClient] IDurableOrchestrationClient starter,
                    ILogger log)
                {
                    try
                    {
                        var dto = JsonConvert.DeserializeObject<MyObject>(myQueueItem);
                        // not to be confused by the MyObject id value
                        string id = starter.StartNewAsync("Orchestration", dto).Result;
        
                        log.LogInformation("Started orchestration with ID = '{id}'.", id);
                    }
                    catch (Exception ex)
                    {
                        log.LogError($"Error processing message: {ex.Message}");
                    }
                }
        
        
                [FunctionName("Orchestration")]
                public static async Task<List<string>> RunOrchestrator(
                    [OrchestrationTrigger] IDurableOrchestrationContext context)
                {
                    MyObject myQueueItem = context.GetInput<MyObject>();
        
                    var outputs = new List<string>();
        
                    outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello),myQueueItem.Id));
        
                    return outputs;
                }
        
                [FunctionName(nameof(SayHello))]
                public static void SayHello([ActivityTrigger] string name, ILogger log)
                {
                    log.LogInformation("Saying hello to {name}.", name);
                }
        
                
            }
        }
    

    local.settings.json:

        {
          "IsEncrypted": false,
          "Values": {
            "AzureWebJobsStorage": "UseDevelopmentStorage=true",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "connectionstring": "xxxxxxxxxxxxx"
          }
        }
    

    As you can see, I am getting message only one time.

    enter image description here

    enter image description here