Search code examples
c#multithreadinglock-free

Read and write float type between two threads in C#


I have two threads and a float-type value has been shared between these threads. A float-type value is written on Thread1 and reads by Thread2. Thread1 and Thread2 start at the same time.

 private const int BUFFER_SIZE = 65536 * 8;
 private float _readWriteValue = 0;
 private bool _stop = false;

 private void Thread1()
 {
     Random ran = new Random();
     while (!_stop)
     {
         _readWriteValue = ran. Next(1, 100);
     }
 }
 private void Thread2()
 {
     while (!_stop)
     {
         float[] BufferData  = new float[BUFFER_SIZE];
         for (int i = 0; i < BUFFER_SIZE; i++)
         {
             BufferData[i] = _readWriteValue;
         }
         ProcessMethod(BufferData);
     }
 }

 private void ProcessMethod(float[] data)
 {
     // Do Something
 }

So, When I check BufferData, it fills by only one number. for example, the BufferData fills only with 22. It seems when _readWriteValue goes to for loop in Thread2, has been locked and Thread1 can not write a new value in it.

I'm trying to find out what is the solution. I'm trying by lock, Monitor, and ConcurrentQueue, but every time I get the same results. BufferData is filled by only one number.

Do I have a wrong understanding of the multi-threading? What should I do?

enter image description here


Solution

  • The short version is that you need a mechanism like AutoResetEvents or a thread-safe queue between the threads like BlockingQueue or Channel. Locks and volatile aren't enough.

    _readWriteValue isn't locked. The loop in Thread2 runs a lot faster than the call to ran.Next(1, 100) so the entire loop can finish before ran.Next(1, 100) had a chance to run a second time. Even if you add volatile to prevent some optimizations, _readWriteValue still has the original value while the loop is running.

    This is called a race condition. The execution result depends entirely on how fast each thread is running. To fix this you need a way of synchronizing the writer, Thread1, and reader, Thread2.

    This is an even trickier problem because the reader has to read all the values produced by the writer. This requires two-way synchronization between the reader and writer.

    Synchronizing Reads/Writes with events

    To avoid this problem, a way is needed to ensure that the reader (Thread2) can only read a value after the writer (Thread1) writes it, and that the writer can't write the value until the reader reads the previous one.

    One way to do this is to use the AutoResetEvent class. Threads can wait on an AutoResetEvent class until another thread signals it. When that happens, the waiting thread continues and the event is automatically reset.

    One event is used to signal that Thread1 produced a value and another that Thread2 consumed it. This snippet doesn't repeat numbers:

    static readonly AutoResetEvent _thread1Step = new AutoResetEvent(false);
    static readonly AutoResetEvent _thread2Step = new AutoResetEvent(true);
    
    private static void Thread1()
    {
        Random ran = new Random(100);
        while (!_stop)
        {
            
            _thread2Step.WaitOne();
            _readWriteValue = ran.Next(1, 100);
            _thread1Step.Set();
        }
    }
    
    private static void Thread2()
    {
        while (!_stop)
        {
            
            const int BUFFER_SIZE = 100;
            float[] bufferData = new float[BUFFER_SIZE];
            for (int i = 0; i < BUFFER_SIZE; i++)
            {
                _thread1Step.WaitOne();
                bufferData[i] = _readWriteValue;
                _thread2Step.Set();
            }
            // Print statistics
            Console.WriteLine(String.Join(", ", bufferData));
            Thread.Sleep(10);
        }
    }
    

    This prints

    96, 16, 67, 90, 36, 94, 71, 61, 35, 15, 96, 54, 38, 50, 97, 48, 89, 99, 39, 4, 11, 38, 89, 96, 50, 84, 91, 65, 17, 62, 11, 79, 51, 50, 56, 10, 86, 53, 47, 74, 28, 50, 34, 59, 63, 86, 36, 3, 91, 44, 3, 79, 30, 86, 22, 59, 27, 70, 40, 51, 4, 7, 44, 73, 5, 17, 3, 88, 93, 88, 62, 37, 53, 64, 76, 60, 5, 30, 34, 64, 49, 88, 73, 73, 59, 32, 50, 65, 28, 97, 83, 17, 13, 95, 70, 21, 6, 60, 55, 46
    83, 48, 9, 4, 82, 66, 26, 65, 10, 61, 54, 96, 36, 76, 3, 15, 33, 71, 15, 73, 67, 38, 60, 96, 6, 45, 24, 57, 94, 55, 55, 45, 75, 87, 81, 1, 79, 70, 90, 92, 6, 85, 55, 66, 44, 86, 80, 37, 93, 55, 88, 34, 46, 81, 78, 45, 87, 13, 98, 37, 42, 69, 70, 55, 7, 10, 21, 49, 94, 2, 36, 63, 80, 22, 67, 82, 83, 93, 53, 19, 65, 87, 63, 40, 67, 21, 98, 93, 9, 36, 14, 67, 72, 54, 51, 36, 15, 99, 60, 34
    66, 32, 42, 91, 20, 25, 54, 25, 15, 96, 62, 93, 60, 79, 72, 55, 6, 31, 88, 85, 11, 27, 40, 58, 88, 68, 91, 27, 71, 31, 67, 83, 34, 19, 53, 33, 45, 72, 21, 42, 67, 74, 79, 13, 74, 20, 7, 92, 81, 95, 30, 84, 12, 74, 24, 39, 92, 83, 4, 51, 33, 28, 53, 83, 29, 78, 60, 42, 27, 39, 11, 34, 10, 46, 19, 36, 48, 27, 84, 68, 62, 99, 45, 75, 1, 83, 72, 59, 94, 14, 41, 61, 68, 69, 9, 39, 21, 96, 83, 96
    

    Using a BlockingQueue works

    Coupling one thread to the other like this is not always a good idea, as each one has to wait for the other to proceed. If one slows down for one reason, so does the other. One way to avoid this is to use a thread-safe queue

    Using a BlockingQueue like another answer shows works with only slight modifications :

    private static BlockingCollection<float> _queue=new BlockingCollection<float>(100);
    
    private static void Thread1()
    {
        Random ran = new Random();
        while (!_stop)
        {
            _queue.Add( ran.Next(1, 100));
        }
    }
    
    private static void Thread2()
    {
        while (!_stop)
        {
            const int BUFFER_SIZE = 100;
            float[] bufferData = new float[BUFFER_SIZE];
            for(int i=0;i<BUFFER_SIZE;i++)
            {
                bufferData[i] = _queue.Take();
            }
            // Print statistics
            Console.WriteLine(String.Join(", ", bufferData));
            Thread.Sleep(10);
        }
    }
    

    And prints

    18, 70, 21, 74, 38, 77, 75, 15, 16, 62, 97, 22, 57, 98, 37, 63, 26, 80, 47, 21, 50, 94, 7, 84, 84, 78, 11, 92, 5, 82, 37, 4, 25, 1, 21, 62, 46, 97, 83, 87, 72, 33, 99, 50, 59, 69, 33, 88, 65, 86, 55, 56, 77, 44, 6, 31, 31, 19, 13, 60, 38, 12, 67, 12, 88, 34, 18, 8, 94, 86, 26, 7, 20, 87, 32, 37, 93, 46, 82, 58, 44, 57, 89, 10, 26, 38, 51, 54, 42, 56, 16, 87, 66, 66, 79, 55, 50, 87, 18, 90
    94, 37, 77, 91, 62, 12, 48, 96, 91, 12, 84, 45, 80, 5, 32, 59, 4, 72, 47, 83, 88, 74, 63, 39, 62, 28, 32, 86, 88, 38, 85, 62, 18, 84, 82, 36, 42, 81, 20, 66, 36, 30, 92, 31, 62, 54, 68, 25, 84, 67, 31, 26, 16, 15, 32, 67, 22, 33, 22, 68, 68, 55, 6, 59, 81, 18, 6, 46, 10, 33, 73, 78, 65, 37, 84, 79, 34, 34, 51, 21, 6, 51, 80, 25, 53, 30, 50, 39, 53, 5, 1, 44, 36, 70, 57, 39, 67, 24, 37, 47
    43, 61, 86, 89, 87, 83, 29, 23, 24, 78, 28, 10, 39, 90, 22, 66, 23, 70, 51, 48, 83, 3, 23, 92, 29, 22, 30, 98, 96, 16, 96, 99, 71, 85, 42, 96, 47, 57, 4, 7, 98, 8, 28, 91, 38, 6, 27, 69, 93, 65, 42, 70, 22, 53, 67, 57, 36, 45, 81, 89, 63, 42, 52, 63, 59, 47, 33, 1, 66, 49, 9, 3, 46, 60, 6, 23, 76, 83, 48, 99, 61, 10, 4, 42, 22, 57, 15, 5, 9, 51, 89, 37, 40, 80, 61, 46, 56, 71, 24, 
    

    That's because the writer, Thread1, writes all values to a thread safe queue. The reader, Thread2, reads these one by one. Using a queue with a capacity of 100 means that if the writer was faster than the reader, the values wouldn't be lost. If the collection gets full, the writer is blocked.

    If you set the capacity to just 1, you effectively run both threads in lockstep.

    Using CancellationToken

    _stop itself needs locking. Instead of adding a lock though, it's better to use the purpose-built CancellationToken class. This is a struct that can be passed to methods that support cancellation to tell them when to cancel. Almost all .NET methods that support cancellation use CancellationToken, and that includes BlockingCollection.Take :

    private static CancellationTokenSource _cts=new ();
    
    private static BlockingCollection<float> _queue=new (100);
    
    public static void Stop()
    {
        _cts.Cancel();
    }
    private static void Thread1()
    {
        Random ran = new Random();
        var token=_cts.CancellationToken;
    
        while (!token.IsCancellationRequested)
        {
            _queue.Add( ran.Next(1, 100));
        }
    }
    
    private static void Thread2()
    {
        var token=_cts.CancellationToken;
    
        while (!_token.IsCancellationRequested)
        {
            const int BUFFER_SIZE = 100;
            float[] bufferData = new float[BUFFER_SIZE];
            for(int i=0;i<BUFFER_SIZE;i++)
            {
                bufferData[i] = _queue.Take(token);
            }
            // Print statistics
            Console.WriteLine(String.Join(", ", bufferData));
            Thread.Sleep(10);
        }
    }
    

    The modern way: async/await

    The code can be simplified a lot by using tasks, async/await and Channels, the asynchronous equivalent to BlockingCollection. This time there's no need to have any fields. The channel and tokens can be passed to a Writer and Reader function as parameters :

    public static async Task Main()
    {
        var  cts=new CancellationTokenSource(TimeSpan.FromSeconds(10));
        var channel=Channel.CreateBounded<float>(100);
    
        var task1=Task.Run(()=>Writer(channel.Writer,cts.Token));
        var task2=Task.Run(()=>Reader(channel.Reader,cts.Token));
        await Task.WhenAll(task1,task2);
    }
            
    
    private static async Task Writer(ChannelWriter<float> writer,CancellationToken token)
    {
        Random ran = new Random();
        while (!token.IsCancellationRequested)
        {
            var value = ran.Next(1, 100);
            await writer.WriteAsync(value,token);
        }
    }
    
    private static async Task Reader(ChannelReader<float> reader,CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {           
            const int BUFFER_SIZE = 100;
            float[] bufferData = new float[BUFFER_SIZE];
            for (int i = 0; i < BUFFER_SIZE; i++)
            {               
                bufferData[i] = await reader.ReadAsync(token);
            }
            // Print statistics
            Console.WriteLine(String.Join(", ", bufferData));
            await Task.Delay(10);
        }
    }
    

    The output is :

    54, 38, 99, 32, 82, 40, 50, 71, 39, 84, 69, 75, 69, 91, 70, 46, 20, 82, 72, 34, 27, 26, 79, 25, 48, 28, 84, 98, 4, 39, 99, 30, 11, 28, 47, 63, 89, 27, 35, 14, 92, 84, 33, 36, 69, 55, 65, 80, 59, 24, 63, 83, 6, 79, 18, 23, 5, 72, 47, 22, 36, 89, 51, 9, 57, 16, 50, 84, 35, 97, 35, 24, 36, 38, 99, 5, 31, 16, 14, 9, 62, 51, 67, 25, 29, 61, 20, 22, 31, 58, 93, 98, 97, 5, 89, 93, 56, 58, 74, 41
    7, 59, 61, 7, 19, 33, 37, 67, 57, 25, 99, 13, 80, 56, 46, 21, 2, 41, 97, 84, 11, 40, 11, 58, 12, 94, 1, 23, 79, 20, 15, 26, 40, 51, 51, 55, 42, 11, 34, 39, 5, 99, 1, 59, 5, 75, 8, 74, 93, 23, 66, 63, 41, 16, 71, 36, 88, 74, 61, 38, 63, 46, 50, 10, 26, 64, 25, 28, 28, 55, 82, 5, 80, 68, 52, 79, 37, 56, 80, 91, 67, 87, 3, 17, 87, 84, 87, 94, 55, 13, 64, 92, 60, 99, 64, 53, 72, 87, 14, 99
    64, 33, 93, 29, 14, 60, 48, 40, 46, 71, 5, 96, 12, 22, 50, 49, 36, 78, 27, 95, 24, 40, 8, 45, 29, 27, 6, 33, 99, 62, 84, 85, 29, 72, 5, 79, 73, 17, 88, 92, 38, 16, 39, 49, 66, 70, 37, 99, 64, 11, 31, 61, 3, 46, 21, 3, 71, 31, 68, 87, 14, 19, 24, 80, 93, 14, 97, 90, 13, 32, 90, 64, 97, 92, 37, 49, 66, 2, 32, 93, 57, 72, 97, 4, 75, 62, 80, 17, 73, 98, 32, 43, 6, 64, 38, 92, 73, 21, 55, 65
    

    Volatile doesn't work

    Even if compiler optimizations are disabled, Thread1 generates values a lot slower than Thread2 reads them.

    Compiler and CPU optimizations can rewrite the loop into code that copies the value of _readWriteValue and copy it to each location directly, perhaps even use SIMD operations to assign multiple locations at once. The volatile keyword can be used to tell the compiler to avoid such optimizations. It doesn't introduce any kind of locking.

    This code, taken almost intact from one of the other answers, prints the same number repeatedly, just like your own code :

    using System;
    using System.Threading;
    using System.Linq;
    
    public class Program
    {
        private static volatile float _readWriteValue = 0;
        private static volatile bool _stop = false;
    
        public static void Main()
        {
            Timer timer = new(_ => _stop = true);
            timer.Change(200, Timeout.Infinite);
    
            Thread t1 = new(() => Thread1());
            Thread t2 = new(() => Thread2());
    
            t1.Start(); t2.Start();
            t1.Join(); t2.Join();
        }
    
        private static void Thread1()
        {
            Random ran = new Random();
            while (!_stop)
            {
                _readWriteValue = ran.Next(1, 100);
            }
        }
    
        private static void Thread2()
        {
            while (!_stop)
            {
                const int BUFFER_SIZE = 100;
                float[] bufferData = new float[BUFFER_SIZE];
                for (int i = 0; i < BUFFER_SIZE; i++)
                {
                    bufferData[i] = _readWriteValue;
                }
                // Print statistics
                Console.WriteLine(String.Join(", ", bufferData));
                Thread.Sleep(10);
            }
        }
    }
    

    Result:

    73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 73, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 69, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 48, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 74, 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, 79, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 66, 66, 66, 66
    4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 94, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 88, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 76, 40, 40, 40, 40
    24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 24, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 92, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 13, 13, 13, 13, 13
    50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 57, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 22, 22, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 54, 54
    59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 61, 61, 61, 61, 61, 61, 61, 61, 61, 61, 61, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 41, 41, 41, 41, 41, 41, 41, 41, 41, 41, 41, 41, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 75, 74, 74
    

    There are roughly 10-15 read operations for every write.

    A lock alone isn't enough

    The reason is the same - the threads run at different speeds and the reader will always read values faster than the writer can produce them.

    This variation of the code also repeats values:

    private static object _locker=new();
    
    private static void Thread1()
    {
        Random ran = new Random(100);
        while (!_stop)
        {
            lock(_locker)
            {
                _readWriteValue = ran.Next(1, 100);
            }
        }
    }
    
    private static void Thread2()
    {
        while (!_stop)
        {
            const int BUFFER_SIZE = 100;
            float[] bufferData = new float[BUFFER_SIZE];
            for (int i = 0; i < BUFFER_SIZE; i++)
            {
                lock(_locker)
                {
                    bufferData[i] = _readWriteValue;
                }
            }
            // Print statistics
            Console.WriteLine(String.Join(", ", bufferData));
            Thread.Sleep(10);
        }
    

    This prints

    55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 55, 81, 81, 81, 81, 81, 81, 81, 81, 81, 81, 69, 69, 69, 69, 69, 69, 22, 22, 22, 22, 22, 22, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 21, 54, 54, 54, 58, 58, 58, 58, 83, 83, 83, 53, 53, 20, 20, 20, 20, 20, 20, 33, 33, 33, 33, 33, 33, 33, 33, 33, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39
    29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 40, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 38, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71, 71
    43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 43, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 98, 8, 8, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59, 59