I am a strong believer in learning by reinventing. With that state of mind, I set out to implement custom thread pool. The objective that I set for myself was following:
I succeeded in attaining the above mentioned objectives but want to validate the approach that I took with the experts on stackoverflow. Also, would like to learn if there are better approaches or how would an expert in multithreading would solve this problem. The following paragraphs mentioned about the challenge that I faced and how did I fixed it.
The thread pool that I created, internally maintained a queue of work items from which all the worker threads picked the item and then process it. Whenever a new item gets queued, it would signal an event so that any free thread can pick it up and execute it.
I started with autoresetevent to signal the waiting threads for any new work items on the queue, but I faced the problem of lost signaled events. It happens when more than one item gets queued and there are no free threads to process the item. The total items that remain unprocessed are same as the total signal events that are lost because of overlapping set (signaling) events.
In order to fix the problem of lost signaled events, I created a wrapper on top of autoresetevent and used it in place of autoresetevent. It fixed on the problem. Here is the code listing for the same:
public static class CustomThreadPool
{
static CustomThreadPool()
{
for (int i = 0; i < minThreads; i++)
_threads.Add(
new Thread(ThreadFunc) { IsBackground = true }
);
_threads.ForEach((t) => t.Start());
}
public static void EnqueWork(Action action)
{
_concurrentQueue.Enqueue(action);
_enqueEvent.Set();
}
private static void ThreadFunc()
{
Action action = null;
while (true)
{
_enqueEvent.WaitOne();
_concurrentQueue.TryDequeue(out action);
action();
}
}
private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>();
private static List<Thread> _threads = new List<Thread>();
private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent();
private static object _syncObject = new object();
private const int minThreads = 4;
private const int maxThreads = 10;
public static void Test()
{
CustomThreadPool.EnqueWork(() => {
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****First*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Second*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Third*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fourth*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fifth*****");
});
}
}
public class CountAutoResentEvent
{
public void Set()
{
_event.Set();
lock (_sync)
_countOfSet++;
}
public void WaitOne()
{
_event.WaitOne();
lock (_sync)
{
_countOfSet--;
if (_countOfSet > 0)
_event.Set();
}
}
private AutoResetEvent _event = new AutoResetEvent(false);
private int _countOfSet = 0;
private object _sync = new object();
}
Now, I have few questions:
Thanks.
From what I've seen I'd say it's correct. I like that you have used ConcurrentQueue
and didn't go about implementing a synchronized queue of your own. That's a mess and will most likely not be as fast as the existing one.
I would only like to note that your custom "signaling mechanism" is actually very similar to a semaphore: a lock that allows more than one thread to enter a critical section. This functionality already exists in the Semaphore class.