Search code examples
c++multithreadingatomicbarrier

Pausing threads for synchronized operation


I have a situation where N threads are working on a data structure simultaneously in small incremental steps. However, sometimes a synchronized action needs to take place.

So all threads need to halt, wait for one of the threads to perform this operation, and continue. I'm looking for a method that has as little as possible impact on the threads when the synchronized action is NOT required.

A simple option is using a shared_mutex, but I think an option with lower overhead is possible. I've attempted my own solution using a barrier and an atomic below.

So my questions are: Is this an effective solution for the problem? Is there a better solution?

#include <vector>
#include <thread>
#include <atomic>
#include <barrier>

int main()
{
    const size_t nr_threads = 10;
    std::vector<std::thread> threads;

    std::barrier barrier { nr_threads };
    std::atomic_bool sync_required { false };

    auto rare_condition = []() { return std::rand() == 42; };

    for (int i = 0; i < nr_threads; ++i)
    {
        threads.emplace_back([&, i]()
        {
            while (true)
            {
                if (sync_required)
                {
                    if (i == 0)
                    {
                        barrier.arrive_and_wait();
                        sync_required = false;
                        // solo synchronized work
                        barrier.arrive_and_wait();
                    }
                    else
                    {
                        barrier.arrive_and_wait();
                        barrier.arrive_and_wait();
                    }
                }

                // standard loop body ...

                // sometimes a global synchronized action is required
                if (rare_condition()) sync_required = true;
            }
        });
    }

    // eventually ... treads quit

    for (auto& thread : threads) 
    {
        thread.join();
    }
}

Another solution with the shared_mutex needs a condition variable?

#include <array>
#include <atomic>
#include <thread>
#include <vector>
#include <barrier>
#include <shared_mutex>

int main()
{
    const size_t nr_threads = 10;
    std::vector<std::thread> threads;

    std::shared_mutex sync_mtx;
    std::atomic_bool sync_required { false };

    auto rare_condition = []() { return std::rand() == 42; };

    for (int i = 0; i < nr_threads; ++i)
    {
        threads.emplace_back([&, i]()
        {
            std::shared_lock shared_lock { sync_mtx };

            while (true)
            {
                // very rarely another thread requires all the others to stop for a bit
                if (sync_required)
                {
                    if (i == 0)
                    {
                        // unlock shared, but lock unique, seems a little odd but neccesary
                        shared_lock.unlock();
                        {
                            std::unique_lock unique_lock{ shared_lock };
                            sync_required = false;
                            // solo sync work
                        }
                        shared_lock.lock();
                    }
                    else
                    {
                        shared_lock.unlock();
                        // todo: need condition variable, which adds more complexity to this solution?
                        shared_lock.lock();
                    }
                }

                // sometimes a global syncronized action is required
                sync_required = sync_required || rare_condition();
            }
        });
    }

    // eventually ... treads quit

    for (auto& thread : threads) 
    {
        thread.join();
    }
}

Solution

  • The major optimization I'll propose to your code is to replace the following

    // Thread: if (sync_required) {
    if (i == 0) { // An arbitrary thread is choosen to run sync().
        // It has to wait until other threads aknowledges having seen sync_required
        barrier.arrive_and_wait();
        sync();
    

    By something like this

    // In main:
    std::atomic<int> unpaused_count{ nr_threads }; // number of threads that haven't noticed sync_required
    
    // Thread: if (sync_required) {
    int old_unpaused_count = unpaused_count.fetch_sub(1); // This thread just noticed sync_required, update the count
    if (old_unpaused_count == 1) { // The thread noticing last is choosen to run sync()
        // It can run sync() immediately, since it knows that all other
        // threads have seen sync_required, and have decided to wait.
        sync();
        // TODO: restore unpaused_count & resume other threads.
    } else { // This thread isn't the last to notice: wait.
    

    If the targeted platform has a wait-free std::atomic<int>::fetch_sub (hopefully the case with all standard libraries on x64), this code can now choose a thread to run sync() in a wait-free manner, and the chosen thread starts running sync() immediately. This should be much better than a lock/barrier, if I didn't mess that bit of lock-free code (that's a big IF).

    My second suggestion is to use std::counting_semaphore to let the sync() thread notify the waiting threads that the sync period is over. The use case is well described on cppreference:

    Semaphores are also often used for the semantics of signaling/notifying rather than mutual exclusion, by initializing the semaphore with ​0​ and thus blocking the receiver(s) that try to acquire(), until the notifier "signals" by invoking release(n). In this respect semaphores can be considered alternatives to std::condition_variables, often with better performance.

    With the two proposed optimizations, the thread chosen to run sync() never needs to acquire a lock, or wait on some barrier/condition variable. This is highly desirable: the faster it finishes its sync() section, the faster the whole system can restart.

    Finally, full code (godbolt):

    #include <atomic>
    #include <limits>
    #include <semaphore>
    #include <thread>
    #include <vector>
    
    using ThreadCount = int;
    static constexpr auto maxThreadCount = std::numeric_limits<ThreadCount>::max();
    
    int main() {
        const ThreadCount nr_threads = 10;
        std::vector<std::thread> threads;
    
        struct SharedVars {
            std::counting_semaphore<maxThreadCount> end_of_sync{ 0 };
            std::atomic<bool> sync_required{ false };
            std::atomic<ThreadCount> unpaused_count{ nr_threads };
        };
        SharedVars shared;
    
        auto rare_condition = []() { return std::rand() == 42; };
    
        for (ThreadCount i = 0; i < nr_threads; ++i) {
            threads.emplace_back([&shared, rare_condition]() {
                while (true) {
                    if (shared.sync_required) {
                        ThreadCount old_unpaused_count = shared.unpaused_count.fetch_sub(1);
                        if (old_unpaused_count == 1) {
                            // SYNC section here
                            shared.unpaused_count.store(nr_threads);
                            shared.sync_required.store(false);
                            shared.end_of_sync.release(nr_threads - 1);
                        } else {
                            shared.end_of_sync.acquire();
                        }
                    }
                    // standard loop body ...
    
                    if (rare_condition()) {
                        shared.sync_required = true;
                    }
                }
            });
        }
    
        for (auto& thread : threads) {
            thread.join();
        }
    }
    

    It is certainly possible to use a weaker memory order than SEQ_CST on some of the atomic operations. Reasoning about weaker memory orders is far above my skill, so I'm leaving it this way.