Search code examples
c++atomicmemory-barriersstdatomiccompare-and-swap

Which memory barriers are minimally needed for updating array elements with greater values?


What would be the minimally needed memory barriers in the following scenario?

Several threads update the elements of an array int a[n] in parallel. All elements are initially set to zero. Each thread computes a new value for each element; then, it compares the computed new value to the existing value stored in the array, and writes the new value only if it is greater than the stored value. For example, if a thread computes for a[0] a new value 5, but a[0] is already 10, then the thread should not update a[0]. But if the thread computes a new value 10, and a[0] is 5, then the thread must update a[0].

The computation of the new values involves some shared read-only data; it does not involve the array at all.

While the above-mentioned threads are running, no other thread accesses the array. The array is consumed later, after all the threads are guaranteed to finish their updates.

The implementation uses a compare-and-swap loop, wrapping the elements into atomic_ref (either from Boost or from C++20):

for (int k = 0; k != n; ++k) // For each element of the array
{
    // Locally in this thread, compute the new value for a[k].
    int new_value = ComputeTheNewValue(k);

    // Establish atomic access to a[k].
    atomic_ref<int> memory(a[k]);

    // [Barrier_0]: Read the existing value.
    int existing_value = memory.load(memory_order_relaxed);

    while (true) // The compare-and-swap loop.
    {
        // Overwrite only with higher values.
        if (new_value <= existing_value)
            break;

        // Need to update a[k] with the higher value "new_value", but
        // only if a[k] still stores the "existing_value".
        if (memory.compare_exchange_weak(existing_value, new_value,
                   /*Barrier_1*/         memory_order_relaxed,
                   /*Barrier_2*/         memory_order_relaxed))
        {
            // a[k] was still storing "existing_value", and it has been
            // successfully updated with the higher "new_value".
            // We're done, and we may exit the compare-and-swap loop.
            break;
        }
        else
        {
            // We get here in two cases:
            //  1. a[k] was found to store a value different from "existing_value", or
            //  2. the compare-and-swap operation has failed spuriously.
            // In the first case, the new value stored in a[k] has been loaded
            // by compare_exchange_weak() function into the "existing_value" variable.
            // Then, we need to compare the "new_value" produced by this thread
            // with the newly loaded "existing_value". This is achieved by simply continuing the loop.
            // The second case (the spurious failure) is also handled by continuing the loop,
            // although in that case the "new_value <= existing_value" comparison is redundant.
            continue;
        }
    }

}

This code involves three memory barriers:

  1. Barrier_0 in memory.load().
  2. Barrier_1, to use in read-modify-write when compare_exchange_weak() succeeds.
  3. Barrier_2, to use in load operation when compare_exchange_weak() fails.

In this scenario, is the code guaranteed to update only with higher values when all three bariers are set to relaxed? If not, what minimal barriers are needed to guarantee the corrrect behavior?


Solution

  • Relaxed is fine, you don't need any ordering wrt. access to any other elements during the process of updating. And for accesses to the same location, ISO C++ guarantees that a "modification order" exists for each location separately, and that even relaxed operations will only see the same or later values in the modification order of the location between loaded or RMWed.

    You're just building an atomic fetch_max primitive out of a CAS retry loop. Since the other writers are doing the same thing, the value of each location is monotonically increasing. So it's totally safe to bail out any time you see a value greater than the new_value.

    For the main thread to collect the results at the end, you do need release/acquire synchronization like thread.join or some kind of flag. (e.g. maybe fetch_sub(1, release) of a counter of how many threads still have work left to do, or an array of done flags so you can just do a pure store.)


    BTW, this seems likely to be slow, with lots of time spent waiting for cache lines to bounce between cores. (Lots of false-sharing.) Ideally you you can efficiently change this to have each thread work on different parts of the array (e.g. computing multiple candidates for the same index so it doesn't need any atomic stuff).

    I cannot guarantee that the computed indices do not overlap. In practice, the overlapping is usually small, but it cannot be eliminated.

    So apparently that's a no. And if the indices touched by different threads are in different cache lines (chunk of 16 int32_t) then there won't be too much false sharing. (Also, if computation is expensive so you aren't producing values very fast, that's good so atomic updates aren't what your code is spending most of its time on.)

    But if there is significant contention and the array isn't huge, you could give each thread its own output array, and collect the results at the end. e.g. have one thread do a[i] = max(a[i], b[i], c[i], d[i]) for 4 to 8 arrays per loop. (Not too many read streams at once, and not a variable number of inputs because that probably couldn't compile efficiently). This should benefit from SIMD, e.g. SSE4.1 pmaxsd doing 4 parallel max operations, so this should be limited mostly by L3 cache bandwidth.

    Or divide the max work between threads as a second parallel phase, with each thread doing the above over part of the output array. Or have the thread_id % 4 == 0 reduce results from itself and the next 3 threads, so you have a tree of reductions if you have a system with many threads.