Search code examples
c#.net-6.0producer-consumerconcurrentdictionaryblockingcollection

Best way to implement consumer queue that you can remove items from sequentially (.net 6)


new poster here so I hope this makes sense ...

I need to create a collection that I can remove items from in sequence (basically stock market time series data). The data producer is multi-threaded and doesn't guarantee that the data will come in sequence.

I've looked all around for a solution but the only thing I can come up with is to create my own custom dictionary, using ConcurrentDictionary and implementing the IProducerConsumer interface so it can be used with with BlockingCollection.

The code I have below does work, but produces an error

System.InvalidOperationException: The underlying collection was modified from outside of the BlockingCollection

when using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary. In this instance I would like to wait for a specified amount of time and then attempt to take the item from the queue again.

My questions is:

  • What's the best way to handle the error when there is no key present. At the moment it seems handling the error would require exiting the loop. Perhaps using GetConsumingEnumerable() is not the right way to consume and a while loop would work better?

Code is below - any help/ideas much appreciated.

IProducerConsumer implementation:

public abstract class BlockingDictionary<TKey, TValue> : IProducerConsumerCollection<KeyValuePair<TKey, TValue>> where TKey : notnull
{
    protected ConcurrentDictionary<TKey, TValue> _dictionary = new ConcurrentDictionary<TKey, TValue>();

    int ICollection.Count => _dictionary.Count;

    bool ICollection.IsSynchronized => false;

    object ICollection.SyncRoot => throw new NotSupportedException();

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        _dictionary.ToList().CopyTo(array, index);
    }

    void ICollection.CopyTo(Array array, int index)
    {

        if (array == null)
        {
            throw new ArgumentNullException("array");
        }
        ((ICollection)_dictionary.ToList()).CopyTo(array, index);
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)_dictionary).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<KeyValuePair<TKey, TValue>>)this).GetEnumerator();
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        return _dictionary.ToList().ToArray();
    }

    bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return _dictionary.TryAdd(item.Key, item.Value);
    }

    public virtual bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = this.FirstOrDefault();
        TValue? value;

        return _dictionary.TryRemove(item.Key, out value);
    }
}

Time Sequence queue implementation (inherits above)

public class TimeSequenceQueue<T> : BlockingDictionary<DateTime, T>
{
    private DateTime _previousTime;
    private DateTime _nextTime;
    private readonly int _intervalSeconds;

    public TimeSequenceQueue(DateTime startTime, int intervalSeconds)
    {
        _intervalSeconds = intervalSeconds;
        _previousTime = startTime;
        _nextTime = startTime;
    }

    public override bool TryTake([MaybeNullWhen(false)] out KeyValuePair<DateTime, T> item)
    {
        item = _dictionary.SingleOrDefault(x => x.Key == _nextTime);
        T? value = default(T);

        if (item.Value == null)
            return false;

        bool result = _dictionary.TryRemove(item.Key, out value);

        if (result)
        {
            _previousTime = _nextTime;
            _nextTime = _nextTime.AddSeconds(_intervalSeconds);
        }

        return result;
    }
}

Usage:

BlockingCollection<KeyValuePair<DateTime, object>> _queue = new BlockingCollection<KeyValuePair<DateTime, object>>(new TimeSequenceQueue<object>());

Consuming loop - started in new thread:

foreach (var item in _queue.GetConsumingEnumerable())
{
    // feed downstream
}

Solution

  • When using the GetConsumingEnumerable() for loop, and the next key in the sequence is not present in the dictionary [...] I would like to wait for a specified amount of time and then attempt to take the item from the queue again.

    I will try to answer this question generally, without paying too much attention to the specifics of your problem. So let's say that you are consuming a BlockingCollection<T> like this:

    foreach (var item in collection.GetConsumingEnumerable())
    {
        // Do something with the consumed item.
    }
    

    ...and you want to avoid waiting indefinitely for an item to arrive. You want to wake up every 5 seconds and do something, before waiting/sleeping again. Here is how you could do it:

    while (!collection.IsCompleted)
    {
        bool consumed = collection.TryTake(out var item, TimeSpan.FromSeconds(5));
        if (consumed)
        {
            // Do something with the consumed item.
        }
        else
        {
            // Do something before trying again to take an item.
        }
    }
    

    The above pattern imitates the actual source code of the BlockingCollection<T>.GetConsumingEnumerable method.

    If you want to get fancy you could incorporate this functionality in a custom extension method for the BlockingCollection<T> class, like this:

    public static IEnumerable<(bool Consumed, T Item)> GetConsumingEnumerable<T>(
        this BlockingCollection<T> source, TimeSpan timeout)
    {
        while (!source.IsCompleted)
        {
            bool consumed = source.TryTake(out var item, timeout);
            yield return (consumed, item);
        }
    }
    

    Usage example:

    foreach (var (consumed, item) in collection.GetConsumingEnumerable(
        TimeSpan.FromSeconds(5)))
    {
        // Do something depending on whether an item was consumed or not.
    }