I have a BlockingCollection
of integers.
Below is the code I am trying to develop to remove the items concurrently from the BlockingCollection
.
static void Produce()
{
for (int i = 0; i < 100000; i++)
{
bc3.Add(i);
}
Run();
}
static void Run()
{
for (int i = 0; i < 5; i++)
{
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}
}
static void Process()
{
var stopWatch = new Stopwatch();
stopWatch.Start();
while (bc3.Count!= 0)
{
bc3.Take();
}
stopWatch.Stop();
Console.WriteLine("Thread {0}",
Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Elapsed Time {0}", stopWatch.Elapsed.Seconds);
}
Is this the correct way remove items in a faster way by creating 5 tasks?
Your measurement results are wrong because you use
stopWatch.Elapsed.Seconds
instead of
stopWatch.ElapsedMilliseconds
You only display the seconds, but ignore the minutes, hours, etc.
No, this is not the correct way of removing items from a BlockingCollection. The statement that does not work is
bc3.Count != 0
All 5 tasks may check this condition at the same time, finding out that there is 1 item left. They all 5 go to
bc3.Take();
One task is able to remove the item, the other 4 tasks are waiting.
One way to solve this is to add
bc3.CompleteAdding();
in Produce()
.
Once the first task finishes, the Run()
method completes and all tasks in the method go out of scope and are garbage collected. This may make you see only 1 instead of 5 completion messages.
To fix this, use
static void Run()
{
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
tasks.Add(Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning));
}
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException)
{ }
}
Here's one of my outputs with 5 tasks (and 100000000 items):
Thread 11
Thread 13
Thread 12
Thread 14
Thread 15
Elapsed Time 12878
Elapsed Time 13122
Elapsed Time 13128
Elapsed Time 13128
Elapsed Time 13128
Run: 13140
Compare this to a single task:
Thread 12
Elapsed Time 10147
Run: 10149
This is because only one Take() method can remove an item and it takes additional time for synchronization.