Search code examples
c#azure-functionsazure-durable-functions

Locally run sub-orchestration fails on WaitForExternalEvent task creation


I am testing C# Azure Durable Functions (isolated .NET 8) as a Proof of Concept for a new project. My workflow is not very complex yet.

  1. HTTP-Trigger to start new orchestration with some data
  2. Store data in Blob storage
  3. Call Sub-Orchestration
  4. Sub-Orchestration -> Download file
  5. Sub-Orchestration -> WaitForExternalEvent
  6. When ExternalEvent is received via HTTP-Trigger, finish Sub-Orchestration and return

I am running the code locally via Visual Studio Code with an Azurite storage emulator. Once the WaitForExternalEvent task is created (not even awaited) in the Sub-Orchestration via

var finished = context.WaitForExternalEvent<string>("Finished");

Then the sub-orchestration throws an Exception:

An invalid asynchronous invocation was detected. This can be caused by awaiting non-durable tasks in an orchestrator function's implementation or by middleware that invokes asynchronous code.

If I use the same code to wait for an external event within the main orchestration, no error is thrown and everything works as expected.

Is there something that I am missing in regards to an Orchestration and Sub-Orchestration within the same project, WaitForexternalEvent or similar? Might it be my Visual Studio Code setup? I previously had all code in one Class, but that showed the same issues.

Please find below a shortened version of my code.

Main Orchestration Function/Class

{
    public static class MainOrchestration
    {        
        [Function(nameof(MainOrchestration))]
        public static async Task<List<string>> RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context, OrchestrationInputDto orchestrationInput)
        {
            ILogger logger = context.CreateReplaySafeLogger(nameof(MainOrchestration));
            List<JobDetails> jobDetails = await RouteDocument(logger, context, orchestrationInput);
            foreach (JobDetails jobDetail in jobDetails) logger.LogInformation("Do some logging");

            // code from Microsoft example
            var outputs = new List<string>();

            return outputs;
        }

        public static async Task<List<JobDetails>> RouteDocument(ILogger logger, TaskOrchestrationContext context, OrchestrationInputDto orchestrationInput) {
            // ToDo: Later use multiple tasks in parallel, only create structure for now
            var parallelTasks = new List<Task<JobDetails>>();

            // add multiple tasks here in future (fan out). For now, only add one
            context.SetCustomStatus("Document is being processed");
            Task<JobDetails> task = context.CallSubOrchestratorAsync<JobDetails>(nameof(SubOrchestration), orchestrationInput);
            parallelTasks.Add(task);

            // wrapper-task that completes when all tasks in parallelTasks have completed
            await Task.WhenAll(parallelTasks);

            List<JobDetails> result = [];
            foreach (Task<JobDetails> subOrchestration in parallelTasks) {
                result.Add(await subOrchestration);
            }

            return result;
        } 


        [Function("ReceiveFileWithParameters")]
        public static async Task<HttpResponseData> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequestData req,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            // parse input, store file and spawn MainOrchestration instance
            ...
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(MainOrchestration), new OrchestrationInputDto(requestParametersDto, storeFileParameters), null);
            logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

            // Returns an HTTP 202 response with an instance management payload.
            // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration
            var response = client.CreateCheckStatusResponse(req, instanceId);
            return response;
        }

Sub-Orchestration Function/Class public static class SubOrchestration

{
    [Function(nameof(SubOrchestration))]
    public static async Task<JobDetails> RunOrchestrator(
        [OrchestrationTrigger] TaskOrchestrationContext context, OrchestrationInputDto orchestrationInput)
    {
        ILogger logger = context.CreateReplaySafeLogger(nameof(SubOrchestration));
        JobDetails jobDetail = await SendFileToFunction(orchestrationInput);
        // Wait for external event
        try {
            var finished = context.WaitForExternalEvent<string>("Finished");
            var failed = context.WaitForExternalEvent<string>("Failed");

            var winner = await Task.WhenAny(finished, failed);
            if (winner == finished)
            {
                logger.LogInformation("Successful");
            }
            else if (winner == failed)
            {
                logger.LogInformation("Failed");
            }
        }
        catch (Exception) {
            logger.LogInformation("Error");
        }
        return jobDetail;
    }

    [Function("SendFileToFunction")]
    public static async Task<JobDetails> SendFileToFunction([ActivityTrigger] OrchestrationInputDto orchestrationInput) {
        /// simply returns a JobDetails entity for now
        return new JobDetails(...);
    }

    [Function("ReceiveExternalEvent")]
    public static async Task ReceiveExternalEvent(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequestData req,
        [DurableClient] DurableTaskClient client,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger(nameof(ReceiveExternalEvent));
        logger.LogInformation("ReceiveExternalEvent HTTP trigger function processed a request.");
       
        await client.RaiseEventAsync(instanceId, "Finished", "true");
    }
}

Solution

  • Turns out I called the Activity function "SendFileToFunction" simply via await, instead of via context.CallActivityAsync(...). This solves the problem.