Search code examples
javac++multithreadingcyclicbarrier

What is C++ version of Java's cyclic barrier?


In java, multiple threads can wait all others at a certain point so that they don't start a new block of codes before all others finish first block:

CyclicBarrier barrier = new CyclicBarrier(2);

// thread 1
readA();
writeB();
barrier.await();
readB();
writeA();

// thread 2
readA();
writeB();
barrier.await();
readB();
writeA();

is there an exact or easy convertion to C++?

Also with OpenCL, there is a similar instruction:

readA();
writeB();
barrier(CLK_GLOBAL_MEM_FENCE);
readB();
writeA();

so all neighbor threads wait each other but it is only a constrained C implementation.


Solution

  • C++ STL doesn't have a Cyclic Barrier. You may propose one to the standards committee :)

    A company like Oracle or Microsoft can quickly decide what to add to their language's library. For C++, people have to come to an agreement, and it can take a while.

    256 threads is a lot. As with all performance-related questions, you need to measure the code to make an informed decision. With 256 threads I would be tempted to use 10 barriers that are synchronized by an 11th barrier. You need to measure to know if that's actually better.

    Check out my C++ implementation of a cyclic barrier, inspired by Java. I wrote it a couple years ago. It's based it off of someone else's (buggy) code I found at http://studenti.ing.unipi.it/~s470694/a-cyclic-thread-barrier/ (link doesn't work anymore...) The code is really simple (no need to credit me). Of course, it's as is, no warranties.

    // Modeled after the java cyclic barrier.
    // Allows n threads to synchronize.
    // Call Break() and join your threads before this object goes out of scope
    #pragma once
    
    #include <mutex>
    #include <condition_variable>
    
    
    class CyclicBarrier
    {
    public:
        explicit CyclicBarrier(unsigned numThreads)
            : m_numThreads(numThreads)
            , m_counts{ 0, 0 }
            , m_index(0)
            , m_disabled(false)
        { }
    
        CyclicBarrier(const CyclicBarrier&) = delete;
        CyclicBarrier(CyclicBarrier &&) = delete;
        CyclicBarrier & operator=(const CyclicBarrier&) = delete;
        CyclicBarrier & operator=(CyclicBarrier &&) = delete;
    
        // sync point
        void Await()
        {
            std::unique_lock<std::mutex> lock(m_requestsLock);
            if (m_disabled)
                return;
    
            unsigned currentIndex = m_index;
            ++m_counts[currentIndex];
    
            // "spurious wakeup" means this thread could wake up even if no one called m_condition.notify!
            if (m_counts[currentIndex] < m_numThreads)
            {
                while (m_counts[currentIndex] < m_numThreads)
                    m_condition.wait(lock);
            }
            else
            {
                m_index ^= 1; // flip index
                m_counts[m_index] = 0;
                m_condition.notify_all();
            }
        }
    
        // Call this to free current sleeping threads and prevent any further awaits.
        // After calling this, the object is no longer usable.
        void Break()
        {
            std::unique_lock<std::mutex> lock(m_requestsLock);
            m_disabled = true;
            m_counts[0] = m_numThreads;
            m_counts[1] = m_numThreads;
            m_condition.notify_all();
        }
    
    private:
        std::mutex     m_requestsLock;
        std::condition_variable m_condition;
        const unsigned m_numThreads;
        unsigned       m_counts[2];
        unsigned       m_index;
        bool           m_disabled;
    };