Search code examples
c#multithreadingparallel-processingtask-parallel-libraryparallel-extensions

Can I TryTake a group of items from a BlockingCollection<T>?


In an application I'm working on, thousands of updates are being received per second. Reflecting these updates on the UI immediately and one by one is performance overkill.

The following code yields very bad performance, because handling each update requires invoking the UI thread and adding a new item to an ObservableCollection which in turn triggers the CollectionChanged event.

foreach (var update in updates.GetConsumingEnumerable())
{
    // Handle Update
}

What I'm trying to do is to make the consumer wait for a little bit of time (i.e. 50 millisecond) to give a chance to the publisher to add more items, and then handle these updates in chunks.

Currently, I'm using the following code, but I find it hard to predict for how long the items will stay in the collection before being consumed, I'm afraid this approach may produce another performance bottleneck.

List<Tuple<T, List<string>>> updatesToHandle = new List<Tuple<T, List<string>>>();
while (!updates.IsAddingCompleted)
{
    Tuple<T, List<string>> item = null;
    if (updates.TryTake(out item, 5)) // Try Take for 5 Milliseconds
    {
        updatesToHandle.Add(item);
    }
    else
    {
        var newItems = new List<T>(updatesToHandle.Count);
        var updatedItems = new List<Tuple<T, List<string>>>(updatesToHandle.Count);
        foreach (var update in updatesToHandle)
        {
            try
            {
                // Handle Update
            }
            finally
            {
                updatesToHandle.Clear();
                Thread.Sleep(50);
            }
        }
    }
}

Solution

  • I would consider using Reactive Extensions ReactiveExtensions. This link solves a similar problem Stock Trading Example