I have a durable function invoked by a queue trigger
This durable function takes time to run, and I think this is then meaning that the message is being resent from the queue to the function multiple times
public async Task RunOrchestratorAsync(
[ServiceBusTrigger("my-queue", Connection = "event-bus-connection")] string queueItem,
[DurableClient] IDurableOrchestrationClient starter)
{
var dto = JsonConvert.DeserializeObject<MyObject>(queueItem);
await starter.StartNewAsync("OrchestratorFunction", dto);
}
As you can see, there are multiple deliveries for the messages
How can I stop this?
Paul
I agree with @Stephen Cleary
If the send message is not processed by function and function is exited by some error or issue. When next time function runs successfully it will process all the unprocessed messages.
I also created a durable function and trigger is service bus queue trigger, and used your code which you provided in the Question.
function1.cs
:
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
namespace FunctionApp1
{
public class MyObject
{
public string Id { get; set; }
}
public static class Function1
{
[FunctionName("QueueTrigger")]
public static void Run(
[ServiceBusTrigger("test", Connection = "connectionstring")] string myQueueItem,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
try
{
var dto = JsonConvert.DeserializeObject<MyObject>(myQueueItem);
// not to be confused by the MyObject id value
string id = starter.StartNewAsync("Orchestration", dto).Result;
log.LogInformation("Started orchestration with ID = '{id}'.", id);
}
catch (Exception ex)
{
log.LogError($"Error processing message: {ex.Message}");
}
}
[FunctionName("Orchestration")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
MyObject myQueueItem = context.GetInput<MyObject>();
var outputs = new List<string>();
outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello),myQueueItem.Id));
return outputs;
}
[FunctionName(nameof(SayHello))]
public static void SayHello([ActivityTrigger] string name, ILogger log)
{
log.LogInformation("Saying hello to {name}.", name);
}
}
}
local.settings.json
:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"connectionstring": "xxxxxxxxxxxxx"
}
}
As you can see, I am getting message only one time.