Search code examples
c#multithreadingconcurrencyproducer-consumerblockingcollection

Producer / hybrid consumer in C# using 4.0 framework classes and Blocking Collection


I have a situation in which I have a producer/consumer scenario. The producer never stops, which means that even if there is a time where there are no items in the BC, further items can be added later.

Moving from .NET Framework 3.5 to 4.0, I decided to use a BlockingCollection as a concurrent queue between the consumer and the producer. I even added some parallel extensions so I could use the BC with a Parallel.ForEach.

The problem is that, in the consumer thread, I need to have a kind of an hybrid model:

  1. Im always checking the BC to process any item that arrived with a Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
  2. Inside this foreach, I execute all the tasks that dont depend between each other.
  3. Here comes the problem. After paralelizing the previous tasks I need to manage their results in the same FIFO order in which they were in the BC. The processing of these results should be made in a sync thread.

A little example in pseudo code follows:

producer:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
     //The object to add has a property with the sequence number
    _concurrentCollection.TryAdd(scannedPage);
}

consumer:

private void Init()
{
    _cancelTasks = false;
    _checkTask = Task.Factory.StartNew(() =>
            {
                while (!_cancelTasks)
                {
                    //BlockingCollections with Parallel ForEach
                    var bc = _concurrentCollection;
                    Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
                    {
                        ScannedPage currentPage = item;
                        // process a batch of images from the bc and check if an image has a valid barcode. T
                    });
                    //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

                }
            });
}
            

Obviously, this cant work as it is because the .GetConsumingEnumerable() blocks until there is another item in the BC. I asume I could do it with tasks and just fire 4 or 5 task in a same batch, but:

  1. How could I do this with tasks and still have a waiting point before the start of the tasks that blocks until there is an item to be consumed in the BC (I don't want to start processing if there is nothing. Once there is something in the BC i would just start the batch of 4 tasks, and use a TryTake inside each one so if there is nothing to take they don't block, because I don't know if I can always reach the number of items from the BC as the batch of tasks, for example, just one item left in the BC and a batch of 4 tasks) ?
  2. How could I do this and take advantage of the efficiency that Parallel.For offers?
  3. How could I save the results of the tasks in the same FIFO order in which the items were extracted from the BC?
  4. Is there any other concurrency class more suited to this kind of hybrid processing of items in the consumer?
  5. Also, this is my first question ever made in StackOverflow, so if you need any more data or you just think that my question is not correct just let me know.

Solution

  • I think I follow what you're asking, why not create a ConcurrentBag and add to it while processing like this:

    while (!_cancelTasks)
    {
       //BlockingCollections with Paralell ForEach
       var bc = _concurrentCollection;
       var q = new ConcurrentBag<ScannedPage>();
       Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
       {
          ScannedPage currentPage = item;
          q.Add(item);
          // process a batch of images from the bc and check if an image has a valid barcode. T
       });
     //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
    
    
      //process items in your list here by sorting using some sequence key
      var items = q.OrderBy( o=> o.SeqNbr).ToList();
      foreach( var item in items){
         ...
      }
    }
    

    This obviously doesn't enqueue them in the exact order they were added to the BC but you could add some sequence nbr to the ScannedPage object like Alex suggested and then sort the results after.

    Here's how I'd handle the sequence:

    Add this to the ScannedPage class:

    public static int _counter;  //public because this is just an example but it would work.
    

    Get a sequence nbr and assign here:

    private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
    {          
        lock( this){   //to single thread this process.. not necessary if it's already single threaded of course.
        System.Threading.Interlocked.Increment( ref ScannedPage._counter);
        scannedPage.SeqNbr = ScannedPage._counter;
        ...
        }
    }