Search code examples
c#concurrencymemory-barrierslock-freelockless

Do I need MemoryBarrier to increase index in lockless collection


I'm trying to implement a simple yet fast producer/consumer collection with the ability to be copied from another thread and to make it as fast as possible, therefore I am not using any locking mechanisms.

Basically the code looks like this (simplified):

var pc = new ProducerConsumer();
pc.Add(1);

var producerTask = Task.Run(() =>
{
    var loop = 1;
    while (true)
    {
        pc.Add(loop);
        if (pc.count == 5000) // example wraparound
        {
            loop++;
            pc.array[0] = loop;
            pc.count = 1; // clear to one element for simplicity of this example
        }
    }
});

var consumerTask = Task.Run(() =>
{
    while (true)
    {
        Console.WriteLine(pc.ToArray().First());
    }
});

Task.WaitAll(producerTask, consumerTask);

class ProducerConsumer
{
    public volatile int count = 0;
    public readonly int[] array = new int[5000];

    // not meant to be thread-safe, designed to be used by just one producer
    public void Add(int value)
    {
        array[count] = value;
        count++;
    }

    // should be thread-safe, used by multiple readers
    public int[] ToArray() => array[..count];
}

The idea is that the reader thread(s) should be free to access anything in array[0..count] at any time and get a valid value. The size of an array is constant.

It seems to be working fine and is 6x faster than ConcurrentBag<T> in benchmarks but do I have a guarantee that these two lines:

array[count] = value;
count++;

are going to be always called in this specific order and will not get for example optimized into:

array[count++] = value;

by a compiler or CPU?

Do I need Interlocked.MemoryBarrier() between these two instructions?

Do I need to mark count as volatile?

I would like to avoid unnecessary instructions as a simple MemoryBarrier slows down the loop by 25%. I tested it for some time with all optimizations enabled and on Release without debugger and seems that neither make any difference.


Actually List<T> from .NET would work great for my needs but for some reason they implemented .Add as:

_version++;
T[] array = _items;
int size = _size;
if ((uint)size < (uint)array.Length)
{
    _size = size + 1;
    array[size] = item;
}
else
{
    AddWithResize(item);
}

Is there a reason why was it implemented like this? Simple reordering of instructions in lines 6 and 7 could improve thread-safeness of generic List (even though I know it's not meant to be thread-safe).


Solution

  • (This answer was posted during the 2nd revision of the question, before the OP added the volatile keyword in the ProducerConsumer.count field.)

    You don't need a full fence Interlocked.MemoryBarrier, but you do need at least one-way ordering of your memory operations when you interact with the ProducerConsumer class. The producer should do a release-store (Volatile.Write), and the consumers should do an acquire-load (Volatile.Read), so the compiler/JIT will maintain order at compile time, and use instructions that stop the CPU from reordering at run-time. Without this, array[count] might read a value from before the = value store was visible. As long as you have acquire/release synchronization between threads, and the count is only increasing, your class should be safe for single-producer/multiple-consumers scenarios.

    But apparently the count is not always increasing. When the array is full, the producer is resetting the count to 1. This is a huge problem, that cannot be solved with barriers alone. The problem is that the producer has no way of knowing when all consumers have finished reading the array, so that it can safely reset it. When you write multithreaded code you should always have in mind that the operating system can suspend any thread at any time, for a duration in the range of 10-30 milliseconds, and sometimes more (demo). So a consumer that executes the line => array[..count]; can read the count, then get suspended for 30 msec, and then resume and use the obsolete count value to copy the first count items of the array. So it might read 4,500 items at a time that the current count is 500. This is unlikely to be what you want.

    One way to solve this problem is to store the state of the ProducerConsumer in an internal State class, and implement the reset by swapping the old State with a new one. This way any consumer that has grabbed a reference to a State, will continue interacting with this state even in case it gets suspended immediately after grabbing it. So it will read consistent data, although potentially a bit stale. Here is an example:

    public class ProducerConsumer
    {
        private State _state = new State();
    
        private class State
        {
            public readonly int[] Items = new int[5000];
            public int Count = 0;
        }
    
        // Not thread-safe (it is intended for a single producer)
        public void Add(int value)
        {
            if (_state.Count >= _state.Items.Length - 1)
            {
                State newState = new();
                newState.Items[0] = value;
                newState.Count = 1;
                Volatile.Write(ref _state, newState);
                return;
            }
            Volatile.Write(ref _state.Items[_state.Count], value);
            Volatile.Write(ref _state.Count, _state.Count + 1);
        }
    
        // Thread-safe (multiple consumers are OK)
        public int[] ToArray()
        {
            State state = Volatile.Read(ref _state);
            int count = Volatile.Read(ref state.Count);
            int[] array = new int[count];
            for (int i = 0; i < count; i++)
                array[i] = state.Items[i];
            return array;
        }
    }
    

    In a previous version of this answer, each and every element of the array was read with volatile semantics: array[i] = Volatile.Read(ref state.Items[i]);. According to a comment by Peter Cordes, this is not needed:

    You don't need volatile reads of each state.Items[i]. You've already synchronized-with the thread that stored those items when you did int count = Volatile.Read(ref state.Count); (an acquire load that saw the value from a release store), so it's now safe to do non-atomic accesses to state.Items[0 .. count-1]; those array elements have already been written. The state reference/pointer came from one acquire-load, so we have a reference to an object that's at most being monotonically appended to, and the Count was written with a release store after the array elements.

    The .NET contains an internal class named SingleProducerSingleConsumerQueue<T>, which is exactly what its name indicates. You could study the source code for educational purposes if you want.