Search code examples
azureazure-functionsazureservicebus

Azure Function / Azure Service Bus: Manually completing a message


I'd like to mark a message as complete after successful completion of all activities. In this case autoCompleteMessages is set to false in host.json.

I can complete or dead letter a message from the ServiceBusTrigger function, but how do I ensure all activities succeeded?

Can it be done in the OrchestrationTrigger function?

FunctionName("QueueStart")]
public static async Task Run(
    [ServiceBusTrigger("%QueueTopicName%", "Subscription", Connection = "ServiceBusConnectionString")]
    ServiceBusReceivedMessage msg,
    ServiceBusMessageActions messageActions,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{

    string inputMessage = Encoding.UTF8.GetString(msg.Body);
    
    await starter.StartNewAsync("Hello", null, inputMessage);

    // can run here, but how to check if all activities succeeded?
    // await messageActions.CompleteMessageAsync(msg);
    // await messageActions.DeadLetterMessageAsync(msg);

}

[FunctionName("Hello")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
    var outputs = new List<string>();

    outputs.Add(await context.CallActivityAsync<string>("Hello_Hello", "London"));
    outputs.Add(await context.CallActivityAsync<string>("Hello_Hello1", "Seattle"));

    // how to mark message as complete here?

    return outputs;
}

[FunctionName("Hello_Hello")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
    log.LogInformation($"Saying hello to {name}.");
    return $"Hello {name}!";
}

[FunctionName("Hello_Hello1")]
public static string SayHello1([ActivityTrigger] string city, ILogger log)
{
    throw new Exception("Exception from hello1");

    log.LogInformation($"Saying hello1 to {city}.");
    return $"Hello {city}!";
}

Solution

  • the following included in the ServiceBusTrigger does the trick

    string instanceID = Guid.NewGuid();
    await starter.StartNewAsync("Hello", instanceID, inputMessage);
    
    var orchestratorStatus = await starter.GetStatusAsync(instanceID);
    
    while (orchestratorStatus.RuntimeStatus == OrchestrationRuntimeStatus.Running || orchestratorStatus.RuntimeStatus == OrchestrationRuntimeStatus.Pending)
    {
        await Task.Delay(1000);
        orchestratorStatus = await starter.GetStatusAsync(instanceID);
    }
    
    if (orchestratorStatus.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
    {
        logger.LogInformation($"Completed orchestration with ID = '{instanceID}'.");
        await messageActions.CompleteMessageAsync(msg);
    }