Search code examples
c#multithreadingmultitaskingblockingcollection

How to concurrently take items from the BlockingCollection?


I have a BlockingCollection of integers. Below is the code I am trying to develop to remove the items concurrently from the BlockingCollection.

static void Produce()
    {
        for (int i = 0; i < 100000; i++)
        {
            bc3.Add(i);
        }
        Run();
    }

    static void Run()
    {
        for (int i = 0; i < 5; i++)
        {
            Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
        }
    }

    static void Process()
    {
        var stopWatch = new Stopwatch();
        stopWatch.Start();
        while (bc3.Count!= 0)
        {
            bc3.Take();
        }
        stopWatch.Stop();
        Console.WriteLine("Thread {0}",
        Thread.CurrentThread.ManagedThreadId);
        Console.WriteLine("Elapsed Time {0}", stopWatch.Elapsed.Seconds);

    }

Is this the correct way remove items in a faster way by creating 5 tasks?


Solution

  • Stopwatch issue

    Your measurement results are wrong because you use

    stopWatch.Elapsed.Seconds
    

    instead of

    stopWatch.ElapsedMilliseconds
    

    You only display the seconds, but ignore the minutes, hours, etc.

    Concurrency issue

    No, this is not the correct way of removing items from a BlockingCollection. The statement that does not work is

    bc3.Count != 0
    

    All 5 tasks may check this condition at the same time, finding out that there is 1 item left. They all 5 go to

    bc3.Take();
    

    One task is able to remove the item, the other 4 tasks are waiting.

    One way to solve this is to add

    bc3.CompleteAdding();
    

    in Produce().

    Garbage collection issue

    Once the first task finishes, the Run() method completes and all tasks in the method go out of scope and are garbage collected. This may make you see only 1 instead of 5 completion messages.

    To fix this, use

        static void Run()
        {
            var tasks = new List<Task>();
            for (int i = 0; i < 5; i++)
            {
                tasks.Add(Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning));
            }
            try
            {
                Task.WaitAll(tasks.ToArray());
            }
            catch (AggregateException)
            { }        
        }
    

    Cost of synchronization

    Here's one of my outputs with 5 tasks (and 100000000 items):

    Thread 11
    Thread 13
    Thread 12
    Thread 14
    Thread 15
    Elapsed Time 12878
    Elapsed Time 13122
    Elapsed Time 13128
    Elapsed Time 13128
    Elapsed Time 13128
    Run: 13140
    

    Compare this to a single task:

    Thread 12
    Elapsed Time 10147
    Run: 10149
    

    This is because only one Take() method can remove an item and it takes additional time for synchronization.