Search code examples
c#multithreadingsynchronizationqueuelock-free

Thread safe limited size queue, without using lock


I'm trying to write a subj queue, but I get deadlocks and other multithreading problems. I want to use Interlocked.CompareExchange to avoid lock usage. But this code doesn't work as expected: it just wipe entire Queue. What am I doing wrong here?

public class FixedSizedQueue<T> : IEnumerable<T>
{
    readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    public int Limit { get; set; }

    public FixedSizedQueue(int limit)
    {
        Limit = limit;
    }

    public void Enqueue(T obj)
    {
        _queue.Enqueue(obj);
        if (_queue.Count <= Limit)
            return;
        int count = _queue.Count;
        if (_queue.Count != Interlocked.CompareExchange(ref count, count, _queue.Count))
        {
            T overflow;
            while (_queue.TryDequeue(out overflow))
            {

            }
        }
    }

    public T[] ToArray()
    {
        return _queue.ToArray();
    }

    public IEnumerator<T> GetEnumerator()
    {
        return _queue.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Maybe I just need another thread that will just cut the queue...


Solution

  • Interlocked.CompareExchange is meaningless on stack variable count, as it is accessed from single thread. As I guess, you tried to use this method on _queue.Count, but it failed to be compiled because .Count is a property, not a simple variable. So you need to define counter in your class.

    public class FixedSizedQueue<T> : IEnumerable<T>
    {
        readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
        int CountShadow = 0; // Counter for check constraints.
        public int Limit { get; set; }
    
        public FixedSizedQueue(int limit)
        {
            Limit = limit;
        }
    
        public void Enqueue(T obj)
        {
            /* Update shadow counter first for check constraints. */
            int count = CountShadow;
            while(true)
            {
                 if(count => Limit) return; // Adding element would violate constraint
                 int countOld = Interlocked.CompareExchange(ref CountShadow, count + 1, count);
                 if(countOld == count) break; //Successful update
                 count = countOld;
            }
            _queue.Enqueue(obj); // This will update real counter.
        }
        ...
    }
    

    Also, you need to set your own setter for Limit property, which would maintain invariant CountShadow <= Limit. Or just forbid user to set that property after object's construction.