I have a loop that dispatches work items to multiple threads. The number of concurrent threads is limited through a semaphore. The loop should exit when a request for cancellation is signaled. However it should wait until all currently running threads have completed their work before exiting.
How would I best do this? Do I need to introduce another synchronization primitive and if so, which would be best suited here? Or is there perhaps even a way to do this with only the existing semaphore?
using System;
using System.Threading;
namespace ConsoleApp10 {
class Program {
static void Main() {
var cts = new CancellationTokenSource();
var t = new Thread(Loop);
t.Start(cts.Token);
Console.WriteLine("Press any key to cancel.");
Console.ReadKey();
Console.WriteLine("Cancelling loop");
cts.Cancel();
t.Join();
Console.WriteLine("At this point all worker threads must have exited.");
Console.ReadLine();
}
static void Loop(object state) {
var ct = (CancellationToken)state;
var numMaxThreads = 4;
var sema = new SemaphoreSlim(numMaxThreads);
try {
while (true) {
sema.Wait(ct);
ThreadPool.QueueUserWorkItem(_ => {
try {
Work();
} finally {
sema.Release();
}
});
}
} catch (OperationCanceledException) {
// Block until all currently running work items have been
// completed before continuing.
}
}
static void Work() {
var r = new Random(Guid.NewGuid().GetHashCode());
Console.WriteLine($" {Thread.CurrentThread.ManagedThreadId}: Starting");
var until = DateTime.Now.AddMilliseconds(1000 + r.Next(5000));
while (true) {
var left = until.Subtract(DateTime.Now);
if (left <= TimeSpan.Zero)
break;
Console.WriteLine($" {Thread.CurrentThread.ManagedThreadId}: " +
$"Working, Left: {left}");
Thread.Sleep(250 + r.Next(100) * 10);
}
Console.WriteLine($" {Thread.CurrentThread.ManagedThreadId}: Ending");
}
}
}
PS. I would like to avoid using Tasks or the Tasks Parallel Library.
Or is there perhaps even a way to do this with only the existing semaphore?
This. You can just Wait
the semaphore as many times as the numMaxThreads
:
catch (OperationCanceledException)
{
for (int i = 0; i < numMaxThreads; i++) sema.Wait();
}
PS. I would like to avoid using Tasks or the Tasks Parallel Library.
I don't know why you want to avoid the TPL. Basically what you are trying to do is to invent the Parallel.While
method. You can find here a modern implementation, based on the Parallel.For
. The advantage of using the TPL is that you get automatic termination of the loop in case of an error, and proper propagation of all errors bundled in an AggregateException
. The TPL uses also the current thread as a worker thread, so you don't waste a thread just to control the other threads. In case you want to free the current thread, because it has to do work like Console.ReadKey()
, you can offload the whole loop to the ThreadPool
with Task t = Task.Run(() => Parallel.For(/*...*/));
. The Parallel
APIs support cancellation with the ParallelOptions.CancellationToken
property.