Search code examples
c#multithreadingsynchronizationsemaphorecancellation

Wait for all work items to complete before exiting when a request for cancellation is received


I have a loop that dispatches work items to multiple threads. The number of concurrent threads is limited through a semaphore. The loop should exit when a request for cancellation is signaled. However it should wait until all currently running threads have completed their work before exiting.

How would I best do this? Do I need to introduce another synchronization primitive and if so, which would be best suited here? Or is there perhaps even a way to do this with only the existing semaphore?

using System;
using System.Threading;

namespace ConsoleApp10 {
    class Program {
        static void Main() {
            var cts = new CancellationTokenSource();
            var t = new Thread(Loop);
            t.Start(cts.Token);
            Console.WriteLine("Press any key to cancel.");
            Console.ReadKey();
            Console.WriteLine("Cancelling loop");
            cts.Cancel();
            t.Join();
            Console.WriteLine("At this point all worker threads must have exited.");
            Console.ReadLine();
        }

        static void Loop(object state) {
            var ct = (CancellationToken)state;
            var numMaxThreads = 4;
            var sema = new SemaphoreSlim(numMaxThreads);
            try {
                while (true) {
                    sema.Wait(ct);
                    ThreadPool.QueueUserWorkItem(_ => {
                        try {
                            Work();
                        } finally {
                            sema.Release();
                        }
                    });
                }
            } catch (OperationCanceledException) {
                // Block until all currently running work items have been
                // completed before continuing.
            }
        }

        static void Work() {
            var r = new Random(Guid.NewGuid().GetHashCode());
            Console.WriteLine($"  {Thread.CurrentThread.ManagedThreadId}: Starting");
            var until = DateTime.Now.AddMilliseconds(1000 + r.Next(5000));
            while (true) {
                var left = until.Subtract(DateTime.Now);
                if (left <= TimeSpan.Zero)
                    break;
                Console.WriteLine($"  {Thread.CurrentThread.ManagedThreadId}: " +
                    $"Working, Left: {left}");
                Thread.Sleep(250 + r.Next(100) * 10);
            }
            Console.WriteLine($"  {Thread.CurrentThread.ManagedThreadId}: Ending");
        }
    }
}

PS. I would like to avoid using Tasks or the Tasks Parallel Library.


Solution

  • Or is there perhaps even a way to do this with only the existing semaphore?

    This. You can just Wait the semaphore as many times as the numMaxThreads:

    catch (OperationCanceledException)
    {
        for (int i = 0; i < numMaxThreads; i++) sema.Wait();
    }
    

    PS. I would like to avoid using Tasks or the Tasks Parallel Library.

    I don't know why you want to avoid the TPL. Basically what you are trying to do is to invent the Parallel.While method. You can find here a modern implementation, based on the Parallel.For. The advantage of using the TPL is that you get automatic termination of the loop in case of an error, and proper propagation of all errors bundled in an AggregateException. The TPL uses also the current thread as a worker thread, so you don't waste a thread just to control the other threads. In case you want to free the current thread, because it has to do work like Console.ReadKey(), you can offload the whole loop to the ThreadPool with Task t = Task.Run(() => Parallel.For(/*...*/));. The Parallel APIs support cancellation with the ParallelOptions.CancellationToken property.