Search code examples
c#wcfasynchronousasync-awaitsemaphore

How do I prevent "maxing out" of CPU: Synchronous method calling multiple workers asynchronously & throttling using SemaphoreSlim?


I am currently optimizing an existing, very slow and timing out production application. There is no option to re-write it.

In short, it is a WCF service that currently calls 4 other "worker" WCF services sequentially. None of the worker services are dependent on results from the other. So we would like it to call them all at once (not sequentially). I will reiterate that we don't have the luxury of re-writing it.

enter image description here

The optimization involves making it call all worker services at once. This is where asynchrony came to mind.

I have limited experience with asynchronous programming, but I have read as widely as I can on the topic, with respect to my solution.

The problem is, on testing, it works but maxes out my CPU. I would appreciate your help

The following is a simplified version of the essential code in main WCF Service

// The service operation belonging to main WCF Service
public void ProcessAllPendingWork()
{
    var workerTasks = new List<Task<bool>>();
    foreach(var workerService in _workerServices)
    {
        //DoWorkAsync is the worker method with the following signature:
        // Task<bool> DoWorkAsync()

        var workerTask = workerService.DoWorkAsync()
        workerTasks.Add(workerTask);
    }

    var task = Task.Run(async ()=>
    {
        await RunWorkerTasks(workerTasks);
    });
    task.Wait();


}

private async RunWorkerTasks(IEnumerable<Tast<bool>> workerTasks)
{
    using(var semaphore = new SemaphoreSlim(initialCount:3))
    {

        foreach (var workerTask in workerTasks)
        {
            await semaphore.WaitAsync();
            try
            {
                await workerTask;
            }
            catch (System.Exception)
            {
                //assume 'Log' is a predefined logging service
                Log.Error(ex);
            }
        }
    }
} 

What I have read:

Multiple ways how to limit parallel tasks processing

How to limit the amount of concurrent async I/O operations?

Approaches for throttling asynchronous methods in C#

Constraining Concurrent Threads in C#

Limiting Number of Concurrent Threads With SemaphoresSlim

Async WCF call with ChannelFactory and CreateChannel


Solution

  • Unless I miss something - your sample code runs ALL workers in parallel. By the time of calling 'workerService.DoWorkAsync()' the worker starts off it's job. 'RunWorkerTasks' only waits for the worker Task to complete. 'DoWorkAsync()' kicks off the async operation while 'await' pauses the calling method from execution until the awaited Task completes.

    The fact of high CPU usage is most likely due to your workerService's activity and not due to the way you call them. In order to verify that, try replacing workerService.DoWorkAsync() with Thread.Sleep(..) or Task.Delay(..). If your CPU usage drops, it is the workers to blame. (Depending on what workerService does) it might be ok or even expected that the CPU consumption increases once you run them in parallel.

    Comming to your question of how to limit parallel execution. Note, that the following sample does not exactly use 3 threads, but at maximum 3 threads.

        Parallel.ForEach(
            _workerServices,
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            workerService => workerService.DoWorkAsync()
                .ContinueWith(res => 
                {
                    // Handle your result or possible exceptions by consulting res.
                })
                .Wait());
    

    As you mentioned that previously your code was executing sequentially, I assume that the workers also have a non-async equivalent. It is probably easier to use those. For calling an async method synchronously is mostly a hassle. I've even had deadlock scenarios just by calling DoWorkAsync().Wait(). There has been much discussion of How would I run an async Task<T> method synchronously?. In essence I try to avoid it. If that is not possible, I attempt to use ContinueWith which increases the complexity, or AsyncHelper of the previous SO-discussion.

        var results = new ConcurrentDictionary<WorkerService, bool>();
        Parallel.ForEach(
            _workerServices,
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            workerService => 
                {
                    // Handle possible exceptions via try-catch.
                    results.TryAdd(workerService, workerService.DoWork());
                });
        // evaluate results
    

    Parallel.ForEach takes advantage of a Thread- or TaskPool. Meaning it dispatches every execution of the given parameter Action<TSource> body onto a dedicated thread. You can easily verify that with the following code. If Parallel.ForEach already dispatches the work on different Threads you can simply execute your 'expensive' operation synchronously. Any async operations would be unnecessary or even have bad impact on runtime performance.

        Parallel.ForEach(
            Enumerable.Range(1, 4),
            m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
    

    This is the demo project I used for testing which does not rely on your workerService.

        private static bool DoWork()
        {
            Thread.Sleep(5000);
            Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
            return DateTime.Now.Millisecond % 2 == 0;
        }
    
        private static Task<bool> DoWorkAsync() => Task.Run(DoWork);
    
        private static void Main(string[] args)
        {
            var sw = new Stopwatch();
            sw.Start();
    
            // define a thread-safe dict to store the results of the async operation
            var results = new ConcurrentDictionary<int, bool>();
    
            Parallel.ForEach(
                Enumerable.Range(1, 4), // this replaces the list of workers
                new ParallelOptions { MaxDegreeOfParallelism = 3 },
                // m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
                m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());
    
            sw.Stop();
    
            // print results
            foreach (var item in results)
            {
                Console.WriteLine($"{item.Key}={item.Value}");
            }
    
            Console.WriteLine(sw.Elapsed.ToString());
            Console.ReadLine();
        }