Search code examples
c#task-parallel-libraryblockingcollection

BlockingCollection<T> batching using TPL DataFlow


Is there a way to batch a collection of items from the blocking collection. E.G.

I have a messaging bus publisher calling blockingCollection.Add()

And a consuming thread which is created like this:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

However, I only want the Console to write after the blocking collection has 10 items on it, whereas GetConsumingEnumerable() always fires after each item is added. I could write my own queue for this but I'd like to use the blocking collection if possible?


Solution

  • Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.

    You would instantiate a BatchBlock<string>, bind it to an ActionBlock<string> and then post to the batch block.

    A pseudo code might look something like this:

    var bb = new BatchBlock<string>(10);
    var ab = new ActionBlock<string[]>(msgArray=>{ 
        foreach(var msg in msgArray) 
            Console.Writeline(msg);
    });
    
    bb.LinkTo(ab);
    
    foreach (string value in blockingCollection.GetConsumingEnumerable())
    {
          bb.Post(value);
    }
    

    Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.