Search code examples
c#concurrencyqueuethread-safetyconcurrent-collections

Fixed size ConcurrentQueue get element at index


I have found an old code which implement a thread safe queue. and I tried to create a new implementation using ConcurrentQueue

Old code

public class BoundedQueue<T> where T : class
{
    private readonly Queue<T> _fixedSizeBuffer;
    private object _bufferLock;
    private int _limit;

    public BoundedQueue(int limit)
    {
        _fixedSizeBuffer = new Queue<T>();
        _limit = limit;
        _bufferLock = new object();

        for (int i = 0; i < _limit; i++)
        {
            _fixedSizeBuffer.Enqueue(null);
        }
    }

    public void AddElementToBuffer(T element)
    {
        lock (_bufferLock)
        {
            _fixedSizeBuffer.Enqueue(element);
            while (_fixedSizeBuffer.Count > _limit)
            {
                _fixedSizeBuffer.Dequeue();
            }
        }
    }

    public T GetElementAt(int index)
    {
        T element;
        lock (_bufferLock)
        {
            element = _fixedSizeBuffer.ElementAt(_limit - index - 1);
        }
        return element;
    }
}

My new code:

public class FixedSizeConcurrentQueue<T> where T : class
{
    private readonly ConcurrentQueue<T> _fixedSizeBuffer;
    private int _maxSize;
    public FixedSizeConcurrentQueue(int maxSize)
    {
        _maxSize = maxSize;
        for (int i = 0; i < _maxSize; i++)
        {
            _fixedSizeBuffer.Enqueue(null);
        }
    }

    public void AddElementToBuffer(T element)
    {
        _fixedSizeBuffer.Enqueue(element);
        while (_fixedSizeBuffer.Count > _maxSize)
        {
            T item;
            _fixedSizeBuffer.TryDequeue(out item);
        }
    }
    public T GetElementAt(int index)
    {
        var element = _fixedSizeBuffer.ElementAt(_maxSize - index - 1);
        return element;
    }
}

My question is about the ElementAt() function or should I better call it TryGetElement().
in the old code the code used a lock in order to sync the different threads.
However in the new code I removed it since I know it is a bad practice to use lock when you have concurrent collection.
So if the index is not found because the queue is empty for example I would get an exception.
So should I wrap it with try catch?
please explain how you would do it.


Solution

  • Your maxsize variable is not synced across threads, so there's a threading issue there. Also, a ConcurrentQueue already has an ElementAtOrDefault function that will automatically return null if the index does not exist.

    I would update to inherit from ConcurrentQueue itself instead.

    public class FixedSizeConcurrentQueue<T> : ConcurrentQueue<T>
    {
        public int MaxSize { get; }
    
        public FixedSizeConcurrentQueue(int maxSize)
        {
            MaxSize = maxSize;
        }
    
        public new void Enqueue(T obj)
        {
            base.Enqueue(obj);
    
            while (base.Count > MaxSize)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    
        public T GetElementAt(int index)
        {
            return base.ElementAtOrDefault(index);;
        }
    }