Search code examples
c#multithreadingblockingqueueblockingcollection

Thread Pool with BlockingCollection


Problem: There are multiple threads accessing a resource. I need to limit their number to a constant MaxThreads. Threads who cannot enter the thread pool should get an error message.

Solution: I started using a BlockingCollection<string> pool in the algorithm below, but I see that BlockingCollection requires a call to CompleteAdding, which I can't do, because I always get incoming threads (I hardcoded to 10 in the example below for debugging purposes), think web requests.

public class MyTest {

    private const int MaxThreads = 3;

    private BlockingCollection<string> pool;

    public MyTest() { 
        pool = new BlockingCollection<string>(MaxThreads);
    }

    public void Go() {
        var addSuccess = this.pool.TryAdd(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        if (!addSuccess) Console.WriteLine(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine(string.Format("Adding thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine(string.Format("Pool size: {0}", pool.Count));

        // simulate work
        Thread.Sleep(1000);

        Console.WriteLine("Thread ID#{0} " + Thread.CurrentThread.ManagedThreadId + " is done doing work.");
        string val;
        var takeSuccess = this.pool.TryTake(out val);
        if (!takeSuccess) Console.WriteLine(string.Format("Failed to take out thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        Console.WriteLine("Taking out " + val);

        Console.WriteLine(string.Format("Pool size: {0}", pool.Count));
        Console.WriteLine(Environment.NewLine);
    }
}

static void Main()
{
    var t = new MyTest();

    Parallel.For(0, 10, x => t.Go());
}

Any ideas on how I can better achieve this?

Thanks!

P.S. Multi-threading newbie here, if you have any suggestions for reading materials, I would greatly appreciate them.

LE: Based on the answers I got, I was able to achieve the desired behavior using this algorithm:

public class MyTest {

    private const int MaxThreads = 3;

    private SemaphoreSlim semaphore;

    public MyTest() { 
        semaphore = new SemaphoreSlim(MaxThreads, MaxThreads);
    }

    public void Go() {

        Console.WriteLine(string.Format("In comes thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        semaphore.Wait();

        try {

        Console.WriteLine(string.Format("Serving thread ID#{0}", Thread.CurrentThread.ManagedThreadId));
        // simulate work
        Thread.Sleep(1000);
        Console.WriteLine(string.Format("Out goes thread ID#{0}", Thread.CurrentThread.ManagedThreadId));

        }

        finally {
            semaphore.Release();
        }

    }
}

static void Main()
{
    var t = new MyTest();

    Parallel.For(0, 10, x=> t.Go());
}

Solution

  • If you want to protect certain number of threads which can access a critical region at a time, you'll have to use Semaphore or SemaphoreSlim. I suggest latter one, which is light weight when compared to former.

    One disadvantage of SemaphoreSlim is that they won't work cross process, but that's fine we have Semaphore to help.

    You can test whether the Semaphore is full via one of the Wait methods provided by the framework with a timeout.

    SemaphoreSlim semaphore = new SemaphoreSlim(3, 3);
    
    if (!semaphore.Wait(0))
    {
        //Already semaphore full.
        //Handle it as you like
    }
    

    http://www.albahari.com/threading/ is a very good resource for threading.