I have a situation in which I have a producer/consumer scenario. The producer never stops, which means that even if there is a time where there are no items in the BC, further items can be added later.
Moving from .NET Framework 3.5 to 4.0, I decided to use a BlockingCollection
as a concurrent queue between the consumer and the producer. I even added some parallel extensions so I could use the BC with a Parallel.ForEach
.
The problem is that, in the consumer thread, I need to have a kind of an hybrid model:
Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
foreach
, I execute all the tasks that dont depend between each other.A little example in pseudo code follows:
producer:
//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
//The object to add has a property with the sequence number
_concurrentCollection.TryAdd(scannedPage);
}
consumer:
private void Init()
{
_cancelTasks = false;
_checkTask = Task.Factory.StartNew(() =>
{
while (!_cancelTasks)
{
//BlockingCollections with Parallel ForEach
var bc = _concurrentCollection;
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
}
});
}
Obviously, this cant work as it is because the .GetConsumingEnumerable()
blocks until there is another item in the BC. I asume I could do it with tasks and just fire 4 or 5 task in a same batch, but:
TryTake
inside each one so if there is nothing to take they don't block, because I don't know if I can always reach the number of items from the BC as the batch of tasks, for example, just one item left in the BC and a batch of 4 tasks) ?I think I follow what you're asking, why not create a ConcurrentBag and add to it while processing like this:
while (!_cancelTasks)
{
//BlockingCollections with Paralell ForEach
var bc = _concurrentCollection;
var q = new ConcurrentBag<ScannedPage>();
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
q.Add(item);
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
//process items in your list here by sorting using some sequence key
var items = q.OrderBy( o=> o.SeqNbr).ToList();
foreach( var item in items){
...
}
}
This obviously doesn't enqueue them in the exact order they were added to the BC but you could add some sequence nbr to the ScannedPage object like Alex suggested and then sort the results after.
Here's how I'd handle the sequence:
Add this to the ScannedPage
class:
public static int _counter; //public because this is just an example but it would work.
Get a sequence nbr and assign here:
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
lock( this){ //to single thread this process.. not necessary if it's already single threaded of course.
System.Threading.Interlocked.Increment( ref ScannedPage._counter);
scannedPage.SeqNbr = ScannedPage._counter;
...
}
}