Search code examples
c#asynchronousasync-awaitparallel-processingparallel.foreach

Async parallel loop returns outside of task too quickly unless I while loop and check for a complete boolean


My functions will return information really fast. that's great news! However because the parallel loop is running async the function returns a value before the loop finishes, unless I do some long running task on the main thread to wait for a result. There is no UI to hold up, so I'm using async/await to try a push for quick results in the TPL.

I have introduced a boolean flag value and while loop to wait for a result.

this works but seems strange.

Is there a better way to 'await' the result in my particular situation. It is in the first code snippet when I use the 'while loop' that things seem weird.

Note: I should mention that because it is an Alexa response, outside of this function is a task which 'Task.Delays' for eight seconds and then returns a response in case my other task takes to long and Alexa is going to time out.

    private static string SavedImageAnalysisResult(IReadOnlyCollection<Image> savedImageList, ConfigurationDto config)
    {
        string result = "[]";
        BreakImageAnalysis = false;

        if (!savedImageList.Any()) return result;

        Parallel.ForEach(savedImageList, new ParallelOptions
        {
            MaxDegreeOfParallelism = 5000
        },
            async (image, loopState) =>
            {
                Task.Run(() => Console.Write("██"));

                string threadLocalAnalysisResult =
                    await AnalyzeImageAsync(image.ImageBytes, config);

                if (IsEmptyOrErrorAnalysis(threadLocalAnalysisResult)) return;
                Task.Run(() => Console.Write("█ █"));
                result = threadLocalAnalysisResult;
                BreakImageAnalysis = true;
                loopState.Break();
            });

        while (!BreakImageAnalysis) if (BreakImageAnalysis) break; //strange to do this?

        return result;
    }

This function is called like this:

    public static List<Person> DetectPersonAsync()
    {
        Task.Run(() => Console.WriteLine("{0}\nNew Person Detection Requested...", DateTime.Now.ToString("f")));

        ConfigurationDto config = Configuration.GetSettings();

        camera = new SecurityCamera();

        byte[] imageData = camera.GetImageAsByte(config.SecurityCameraUrl +
                                                 config.SecurityCameraStaticImage +
                                                 DateTime.Now);

        if (!imageData.Any()) return null;

        string imageAnalysis = "[]";

        SavedImageList = camera.ImageCache;

        Task.Run(() => Console.WriteLine("\nBegin Image Analysis...\n"));

        var imageAnalysisTasks = new[]
        {
            Task.Factory.StartNew(() => SavedImageAnalysisResult(SavedImageList, config)),
            Task.Factory.StartNew(() => InProgressImageAnalysisResult(camera, config))
        };

        Task.WaitAll(imageAnalysisTasks);

        Task.Run(() => Console.WriteLine("\n\nAnalysis complete\n"));

        if (!IsEmptyOrErrorAnalysis(imageAnalysisTasks[0].Result))
            imageAnalysis = imageAnalysisTasks[0].Result;

        if (!IsEmptyOrErrorAnalysis(imageAnalysisTasks[1].Result))
            imageAnalysis = imageAnalysisTasks[1].Result;

        return !IsEmptyOrErrorAnalysis(imageAnalysis)
            ? JsonConvert.DeserializeObject<List<Person>>(imageAnalysis)
            : new List<Person>();
    }

But this function is called like this:

   if (alexa.IsLaunchRequest(alexaRequest))
   {
    //We don't want to wait for these two tasks to return
    Task.Run(() => SendSecurityImageToMagicMirrorUi());
    Task.Run(() => alexa.PostDirectiveResponseAsync(alexaRequest));

     //On your marks get set go! 8 seconds and counting
     return await Task.WhenAny(new[]
     {
         Task.Run(() => GetAlexaCognitiveResponseAsync()),
         Task.Run(() => AlexaRequestTimeoutMonitor())
     }).Result;
   }

And finally there is the Timeout function which will return if 8 seconds is up:

    private static async Task<object> AlexaRequestTimeoutMonitor()
    {
        await Task.Delay(new TimeSpan(0, 0, 0, 8));
        
        return AlexaApi.ResponseBuilder(CreateNoPersonDetectionPhrase(new AlexaSynthesisResponseLibrary()), false);
    }

It is in the CreateNoPersonDetectedPhrase function in which I turn the IsFound boolean flag back to false.


Solution

  • Your Parallel.ForEach starts asynchronous delegates and in reality they are being executed in parallel up until await AnalyzeImageAsync point which returns Task as a promise. Now having this promise the Parallel.ForEach "thinks" this task is completed while in reality an asynchronous operation most likely is just getting started. But no one is awaiting for that so Parallel.ForEach completes very quickly. So you need to expose these promises outside of the Parallel.ForEach loop so that they can be awaited with Task.WhenAll for example.

    But I'd also suggest to consider whether you need this parallelization at all since if the most of AnalyzeImageAsync operation (which isn't presented here) isn't CPU-bound it's more reasonable to use only asynchronicity.

    The simplest way (I'm not saying the best) to wait for the async tasks in the Parallels.Forxxx

        public static async Task SomeAsyncTask()
        {
            await Task.Delay(5000);
            Console.WriteLine("2222");
        }
    
        public static async Task Loop()
        {
            var collection = new[] { 1, 2, 3, 4 };
            var asyncTasks = new Task[4];
    
            Parallel.ForEach(collection,
                (item, loop, index) =>
                {
                    Console.WriteLine("1111");
                    asyncTasks[index] = SomeAsyncTask();
                });
    
            await Task.WhenAll(asyncTasks);
        }
    

    Note that everything after SomeAsyncTask() should be moved into the continuation of the task (in this case it's Console.WriteLine("2222")).