Search code examples
c#blockingcollection

Why does BlockingCollection<T> slow down as you add more threads to it?


I am doing some profiling of the BlockingCollection with a view for using it in a UDP Data processing scenario. Currently it is being used in a single consumer fashion with incoming UDP data being written to the collection and then processed accordingly.

I thought I would do some profiling on having multiple consumers / tasks to increase performance but Im seeing some strange results so I must be missing something.

FYI the cancellation token isnt being used for the profiling.

The essence is to queue up 1000000 numbers and then pull them out from different threads which I though would have increased performance but its having the reverse affect.

Here is the basic setup for testing / profiling (console application)

    static BlockingCollection<int> Queue = new BlockingCollection<int>();

     static void Main(string[] args) {

        m_tokenProcessData = new CancellationTokenSource();

        m_cancellationToken = m_tokenProcessData.Token;

        PrepareQueue();

        StartTasks(1);

        Console.ReadKey();

    }

    static void PrepareQueue() {

        for (int i = 0; i <= 1000000; i++) {

            Queue.Add(i);

        }

    }

     static void StartTasks( int maxTasks ) {

        m_startTime = DateTime.Now;

        for(int i=0; i<=maxTasks; i++ ) {

            Task.Factory.StartNew(() => ProcessData(), m_cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);

        }

    }

    static void ProcessData( ) {

        foreach( var number in Queue.GetConsumingEnumerable() ) {

            Task.Delay(10);

            var test = Queue.Count;

            if (test == 0) Finish();

        }

    }

     static void Finish() {

        var endTime = DateTime.Now;

        var timeTaken = (endTime - m_startTime).TotalMilliseconds;

        Console.WriteLine($"Processing Took {timeTaken}ms");

    }

Task.Delay(10) is just in there to simulate some work being done.

Testing Results

1 Task = 3217ms
2 Tasks = 3178ms
4 Tasks = 3365ms
8 Tasks = 3986ms
16 Tasks = 4380ms
32 Tasks = 3954ms
64 Tasks = 4854ms

Can anyone assist in what i might be missing / not understanding?

Thanks,

Daniel.


Solution

  • BlockingCollection<T> is a thread-safe component, and contains a synchronization primitive (a SemaphoreSlim object). My guess is that the more threads you throw at it, the more time the Semaphore must wait. See Line 431 in the BlockingCollection source code.

    Throwing more threads at a problem also increases overhead. It often happens that the optimal solution is not more threads. Depending on the Producer/Consumer scenario, the optimal number of threads often turns out to be the number of cores you have in your computer, because beyond that, the threads are merely doing task-switching.