Search code examples
c#multithreadinginterlocked

Interlocked.Increment not thread safe?


I found a compiler bug in just one line of code:

int thisIndex = Interlocked.Increment(ref messagesIndex) & indexMask;

The definitions are:

static int messagesIndex = -1;
public const int MaxMessages = 0x10000;
const int indexMask = MaxMessages-1;

messagesIndex is not accessed by any other line of code.

If I run that code billions of times in a single thread, I don't get any errors.

If I run the above line on several threads, I get the same number twice and another number gets skipped every 1x-thousand times.

The following line I have run billions of times on 6 threads without ever getting an error:

int thisIndex = Interlocked.Increment(ref messagesIndex);

Conclusion and Question

It seems that Interlocked.Increment() on its own works as expected, but Interlocked.Increment() & indexMask does not :-(

Any idea how I can make it work properly all the time, not just 99.99% ?

I tried to assign Interlocked.Increment(ref messagesIndex) to a volatile integer variable and do the "& indexMask" operation on that variable:

[ThreadStatic]
volatile static int nextIncrement;

nextIncrement = Interlocked.Increment(ref mainIndexIncrementModTest);
indexes[testThreadIndex++] = nextIncrement & maskIncrementModTest;

It causes the same problem like when I write it in 1 line.

Disassembly

Maybe someone can guess from the disassembly what problem the compiler introduces:

indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementTest);
0000009a  mov         eax, dword ptr [ebp-48h] 
0000009d  mov         dword ptr [ebp-58h], eax 
000000a0  inc         dword ptr [ebp-48h] 
000000a3  mov         eax, dword ptr [ebp-44h] 
000000a6  mov         dword ptr [ebp-5Ch], eax 
000000a9  lea         ecx, ds:[00198F84h] 
000000af  call        6D758403 
000000b4  mov         dword ptr [ebp-60h], eax 
000000b7  mov         eax, dword ptr [ebp-58h] 

000000ba  mov         edx, dword ptr [ebp-5Ch] 
000000bd  cmp         eax, dword ptr [edx+4] 
000000c0  jb          000000C7 
000000c2  call        6D9C2804 
000000c7  mov         ecx, dword ptr [ebp-60h] 
000000ca  mov         dword ptr [edx+eax*4+8], ecx 

indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementModTest) & maskIncrementModTest;
0000009a  mov         eax, dword ptr [ebp-48h] 
0000009d  mov         dword ptr [ebp-58h], eax 
000000a0  inc         dword ptr [ebp-48h] 
000000a3  mov         eax, dword ptr [ebp-44h] 
000000a6  mov         dword ptr [ebp-5Ch], eax 
000000a9  lea         ecx,ds:[001D8F88h] 
000000af  call        6D947C8B 
000000b4  mov         dword ptr [ebp-60h], eax 
000000b7  mov         eax, dword ptr [ebp-60h] 
000000ba  and         eax, 0FFFh 
000000bf  mov         edx, dword ptr [ebp-58h] 
000000c2  mov         ecx, dword ptr [ebp-5Ch] 
000000c5  cmp         edx, dword ptr [ecx+4] 
000000c8  jb          000000CF 
000000ca  call        6DBB208C 
000000cf  mov         dword ptr [ecx+edx*4+8], eax 

Bug Detection

To discover the bug, I run the problem line in 6 threads endlessly and each thread writes the returned integers in huge integer arrays. After some time, I stop the threads and search all six integer arrays if every number is returned exactly once (of course, I allow for the "& indexMask" operation).

using System;
using System.Text;
using System.Threading;


namespace RealTimeTracer 
{
    class Test 
    {
        #region Test Increment Multi Threads
        //      ----------------------------

        const int maxThreadIndexIncrementTest = 0x200000;
        static int mainIndexIncrementTest = -1; //the counter gets incremented before its use
        static int[][] threadIndexThraces;

        private static void testIncrementMultiThread() 
        {
            const int maxTestThreads = 6;

            Thread.CurrentThread.Name = "MainThread";

            //start writer test threads
            Console.WriteLine("start " + maxTestThreads + " test writer threads.");
            Thread[] testThreads = testThreads = new Thread[maxTestThreads];
            threadIndexThraces = new int[maxTestThreads][];
            int testcycle = 0;

            do 
            {
                testcycle++;
                Console.WriteLine("testcycle " + testcycle);
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++) 
                {
                    Thread testThread = new Thread(testIncrementThreadBody);
                    testThread.Name = "TestThread " + testThreadIndex;
                    testThreads[testThreadIndex] = testThread;
                    threadIndexThraces[testThreadIndex] = new int[maxThreadIndexIncrementTest+1]; //last int will be never used, but easier for programming
                }

                mainIndexIncrementTest = -1; //the counter gets incremented before its use
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++) 
                {
                    testThreads[testThreadIndex].Start(testThreadIndex);
                }

                //wait for writer test threads
                Console.WriteLine("wait for writer threads.");

                foreach (Thread testThread in testThreads)
                {
                    testThread.Join();
                }

                //verify that EVERY index is used exactly by one thread.
                Console.WriteLine("Verify");
                int[] threadIndexes = new int[maxTestThreads];

                for (int counter = 0; counter < mainIndexIncrementTest; counter++) 
                {
                    int threadIndex = 0;
                    for (; threadIndex < maxTestThreads; threadIndex++) 
                    {
                        if (threadIndexThraces[threadIndex][threadIndexes[threadIndex]]==counter) 
                        {
                            threadIndexes[threadIndex]++;
                            break;
                        }
                    }

                    if (threadIndex==maxTestThreads) 
                    {
                        throw new Exception("Could not find index: " + counter);
                    }
                }
            } while (!Console.KeyAvailable);
        }

        public static void testIncrementThreadBody(object threadNoObject)
        {
            int threadNo = (int)threadNoObject;
            int[] indexes = threadIndexThraces[threadNo];
            int testThreadIndex = 0;
            try
            {
                for (int counter = 0; counter < maxThreadIndexIncrementTest; counter++)      
                {
                    indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementTest);
                }
            } 
            catch (Exception ex) 
            {
                OneTimeTracer.Trace(Thread.CurrentThread.Name + ex.Message);
            }
        }
        #endregion


        #region Test Increment Mod Multi Threads
        //      --------------------------------

        const int maxThreadIndexIncrementModTest = 0x200000;
        static int mainIndexIncrementModTest = -1; //the counter gets incremented before its use
        const int maxIncrementModTest = 0x1000;
        const int maskIncrementModTest = maxIncrementModTest - 1;


        private static void testIncrementModMultiThread() 
        {
            const int maxTestThreads = 6;

            Thread.CurrentThread.Name = "MainThread";

            //start writer test threads 
            Console.WriteLine("start " + maxTestThreads + " test writer threads.");
            Thread[] testThreads = testThreads = new Thread[maxTestThreads];
            threadIndexThraces = new int[maxTestThreads][];
            int testcycle = 0;
            do 
            {
                testcycle++;
                Console.WriteLine("testcycle " + testcycle);
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++)
                {
                    Thread testThread = new Thread(testIncrementModThreadBody);
                    testThread.Name = "TestThread " + testThreadIndex;
                    testThreads[testThreadIndex] = testThread;
                    threadIndexThraces[testThreadIndex] = new int[maxThreadIndexIncrementModTest+1]; //last int will be never used, but easier for programming
                }

                mainIndexIncrementModTest = -1; //the counter gets incremented before its use
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++) 
                {
                    testThreads[testThreadIndex].Start(testThreadIndex);
                }

                //wait for writer test threads
                Console.WriteLine("wait for writer threads.");
                foreach (Thread testThread in testThreads) 
                {
                    testThread.Join();
                }

                //verify that EVERY index is used exactly by one thread.
                Console.WriteLine("Verify");
                int[] threadIndexes = new int[maxTestThreads];
                int expectedIncrement = 0;
                for (int counter = 0; counter < mainIndexIncrementModTest; counter++) 
                {
                    int threadIndex = 0;
                    for (; threadIndex < maxTestThreads; threadIndex++) 
                    {
                        if (threadIndexes[threadIndex]<maxThreadIndexIncrementModTest     && 
threadIndexThraces[threadIndex][threadIndexes[threadIndex]]==expectedIncrement) 
                        {
                            threadIndexes[threadIndex]++;
                            expectedIncrement++;
                            if (expectedIncrement==maxIncrementModTest) 
                            {
                                expectedIncrement = 0;
                            }
                            break;
                        }
                    }

                    if (threadIndex==maxTestThreads) 
                    {
                        StringBuilder stringBuilder = new StringBuilder();
                        for (int threadErrorIndex = 0; threadErrorIndex < maxTestThreads; threadErrorIndex++)
                        {
                            int index = threadIndexes[threadErrorIndex];
                            if (index<0) 
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + " is empty");
                            }
                            else if (index==0)
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + "[0]=" +
                                threadIndexThraces[threadErrorIndex][0]);
                            }
                            else if (index>=maxThreadIndexIncrementModTest) 
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + "[" + (index-1) + "]=" +
                threadIndexThraces[threadErrorIndex][maxThreadIndexIncrementModTest-2] + ", " + 
                threadIndexThraces[threadErrorIndex][maxThreadIndexIncrementModTest-1]);
                            } 
                            else 
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + "[" + (index-1) + "]=" +
                threadIndexThraces[threadErrorIndex][index-1] + ", " + 
                threadIndexThraces[threadErrorIndex][index]);
                            }
                        } 

                        string exceptionString = "Could not find index: " + expectedIncrement + " for counter " + counter + Environment.NewLine + stringBuilder.ToString();
                        Console.WriteLine(exceptionString);

                        return;
                        //throw new Exception(exceptionString);
                    }
                }
            } while (!Console.KeyAvailable);
        }


        public static void testIncrementModThreadBody(object threadNoObject)
        {
            int threadNo = (int)threadNoObject;
            int[] indexes = threadIndexThraces[threadNo];
            int testThreadIndex = 0;
            try
            {
                for (int counter = 0; counter < maxThreadIndexIncrementModTest; counter++) 
                {
                    // indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementModTest) & maskIncrementModTest;
                    int nextIncrement = Interlocked.Increment(ref mainIndexIncrementModTest);
                    indexes[testThreadIndex++] = nextIncrement & maskIncrementModTest;
                }
            } 
            catch (Exception ex) 
            {
                OneTimeTracer.Trace(Thread.CurrentThread.Name + ex.Message);
            }
        }
        #endregion
    }
}

This produces the following error:

Content of the 6 int-arrays (1 per thread)
Thread 0[30851]=2637, 2641
Thread 1[31214]=2639, 2644
Thread 2[48244]=2638, 2643
Thread 3[26512]=2635, 2642
Thread 4[0]=2636, 2775
Thread 5[9173]=2629, 2636

Explanation:
Thread 4 uses 2636
Thread 5 also uses 2636 !!!! This should never happen
Thread 0 used 2637
Thread 2 used 2638
Thread 1 used 2639
2640 is not used by any thread !!! That's the error the test detects
Thread 0 used 2641
Thread 3 used 2642


Solution

  • It is not Interlocked which is wrong. There is also no race condition. It does not even have something to do with multithreading.

    • Interlocked.Increment(ref variable) & Mask

    There is no race condition and no problem with atomicity as it was speculated. Interlocked returns the read value in a register (eax). The masking happens on the read value inside a register which has nothing to do with memory models or atomicity. Registers and local variables cannot be seen from other threads and hence not interfere.

    • Testing

    You are verifying that all values are seen by using a int[nThreads] where you check each thread if it has seen the value at index n and assume that the next value must be seen on this or any other thread.

    Console.WriteLine("Verify");
    int[] threadIndexes = new int[nThreads];
    
    for (int counter = 0; counter < GlobalCounter; counter++) 
    {
        int nThread = 0;
        for (; nThread < nThreads; nThread++) 
        {
            if (ThreadArrays[nThread][threadIndexes[nThread]]==counter) 
            {
                threadIndexes[nThread]++;
                break;
            }
        }
    
        if (nThread==nThreads) 
        {
            throw new Exception("Could not find index: " + counter);
        }
    }
    

    I have renamed the variables to get a more clear naming. I have changed the masked test that the bitmasking is not done in the thread but during verify time which also asserts. This shows that you have a logic problem in your test code. The threaded function does only store the incremented value as in your not masked test.

    //verify that EVERY index is used exactly by one thread.
    Console.WriteLine("Verify");
    int[] threadIndexes = new int[nThreads];
    int expectedIncrement = 0;
    for (int counter = 0; counter < GlobalCounter; counter++) 
    {
        int threadIndex = 0;
        for (; threadIndex < nThreads; threadIndex++) 
        {
            if (threadIndexes[threadIndex]<LoopCount && (ThreadArrays[threadIndex][threadIndexes[threadIndex]] & MaxIncrementBitMask)==expectedIncrement) 
            {
                threadIndexes[threadIndex]++;
                expectedIncrement++;
                if (expectedIncrement == MaxIncrementBit)
                {
                    expectedIncrement = 0;
                }
                break;
            }
        }
    
        if (threadIndex==nThreads) 
        {
    

    Your check breaks when there is a wraparound in the values. E.g. 0 is the first value. At 0x1000 & 0xFFF it is again 0. Now it can happen that you account some of the wrapped values to the wrong thread and you break the implicit assumption that each thread has only unique values. In the debugger I see e.g. for the value 8

    threadIndexes [0] = 1
    threadIndexes [1] = 4
    threadIndexes [2] = 0
    threadIndexes [3] = 1
    threadIndexes [4] = 1
    threadIndexes [5] = 1
    

    although you must account the first 8 values to threadIndexes[1] which is the first thread which starts counting from 0 to several thousands.

    To sum it up: Interlocked and masking works. Your test is flawed and perhaps some of your code relying on invalid assumptions.