Search code examples
c#.netblockingcollection

Consumer with timeout and under specific condition


The BlockingCollection<T> class provides an easy way to implement the producer/consumer pattern, but unfortunately doesn't have a feature I need. It allows me to set a timeout while waiting to consume an element, but does not provide a way to restrict which item is removed from the collection.

How can I implement a class similar to BlockingCollection<T>, but which allows me to specify the condition under which items should be taken?

For example: I need to take Bar item only with Amount equal to specific value:

public class Bar 
{
    public Int32 Amount { get; set; }
}

public class Program 
{
    public  static void Main()
    {
        ToDoCollection<Bar> ToDoCollection = new ToDoCollection<Bar>();
        int timeout = 10000;

        // this doesn't work, that's why I'm asking for your help
        Bar value = ToDoCollection.TryTake().Where(p => p.Amount != 5);

        // Here, I need to wait for 10s trying to take item from blockingCollection
        // item, that will follow specific condition: Bar.Amount has to be greater then zero
    }
}

Solution

  • If I understand correctly, you want a collection that has the following behavior:

    1. Allows a thread to attempt to retrieve an item matching a specific condition, and will block the thread until such time as that item is present.
    2. Allows a thread to specify a timeout for the operation described in point #1.

    The existing BlockingCollection class apparently has nothing at all to do with the question.

    You can implement your own collection type, adding whatever specific features you need. For example:

    class BlockingPredicateCollection<T>
    {
        private readonly object _lock = new object();
        private readonly List<T> _list = new List<T>();
    
        public void Add(T t)
        {
            lock (_lock)
            {
                _list.Add(t);
    
                // Wake any waiting threads, so they can check if the element they
                // are waiting for is now present.
                Monitor.PulseAll(_lock);
            }
        }
    
        public bool TryTake(out T t, Predicate<T> predicate, TimeSpan timeout)
        {
            Stopwatch sw = Stopwatch.StartNew();
    
            lock (_lock)
            {
                int index;
    
                while ((index = _list.FindIndex(predicate)) < 0)
                {
                    TimeSpan elapsed = sw.Elapsed;
    
                    if (elapsed > timeout ||
                        !Monitor.Wait(_lock, timeout - elapsed))
                    {
                        t = default(T);
                        return false;
                    }
                }
    
                t = _list[index];
                _list.RemoveAt(index);
                return true;
            }
        }
    }
    

    Then, for example:

    BlockingPredicateCollection<Bar> toDoCollection = new BlockingPredicateCollection<Bar>();
    int timeout = 10000;
    Bar value;
    
    if (toDoCollection.TryTake(out value,
        p => p.Amount != 5, TimeSpan.FromMilliseconds(timeout)))
    {
        // do something with "value"
    }