Search code examples
c++linuxmultithreadingsemaphorecondition-variable

Is there a way to atomically flush a binary semaphore in C++ on Linux?


Some kernels provide a "flush" operation on semaphore to unblock all tasks waiting on a semaphore.

For example, VxWorks has a semFlush() API that atomically unblocks all tasks pended on a specified semaphore, i.e., all tasks will be unblocked before any is allowed to run.

I am implementing a C++ class on Linux that behaves like a Binary Semaphore and also has this "flush" functionality. Unfortunately, semaphore.h on Linux doesn't provide flush() or broadcast() like APIs.

What I have tried: Using condition variables to implement binary semaphore. Here's my pseudo code:

class BinarySem
{
    BinarySem();

    bool given;
    mutex m;
    condition_var cv;
    give();
    take();
    take( Timeout T );
    tryTake();
    flush();
}

BinarySem::BinarySem()
: given(false)
{}

// take(Timeout T), tryTake() not shown
// to make question concise on StackOverflow

BinarySem::give()
{
    {
        lock_guard lk(m);
        given = true;
    }
    cv.notify_one();
}   

BinarySem::flush()
{
    {
        lock_guard lk(m);
        given = true;
    }
    cv.notify_all();
}

BinarySem::take()
{
    unique_lock lk(m);
    while(!given)
    {
        cv.wait(lk);
    }
    given = false;
    lk.unlock();
}

However, this flush() won't behave in the correct way. Say, we have 2 threads waiting on a BinarySem (i.e. they both have called take()). Let these threads be hiPrioThread and loPrioThread.

When flush() is called on the BinarySem object, the hiPrioThread would exit from take() and run. When it yields (hiPrioThread just yields, it has not yet exited), the loPrioThread would still not be able to run because boolean given is now false again. The boolean is required as a protection against spurious wake-ups.

On the contrary, a semaphore's flush() function should just unblock all threads and they can run whenever they get a chance.

What if I don't set given = false at the end of take()? That will make my code vulnerable to spurious wake-ups and then multiple threads might get unblocked when give() is used.

Does anyone have any suggestions?


Solution

  • Borrow a concept from some "CyclicBarrier" implementations and have a generation or cycle counter.

    "Flushing" the semaphore is then advancing the generation. Each taker makes note of its generation before it waits, and the taker waits for the semaphore to be given or for the generation to change:

    BinarySem::flush() {
      {
        lock_guard lk(m);
        current_gen++;    // "flush" all waiters from the previous gen
        //given = true;   // No need to give; the 'current' taker will do this when done
      }
      cv.notify_all();
    }
    
    BinarySem::take() {
      lock_guard lk(m);
      uint64_t my_generation = current_gen;
      while (!given && my_generation == current_gen) {
        cv.wait(lk);
      }
      if (my_generation == current_gen) {
        given = false;
      }
    }
    

    (Warning: untested)