edit: this is not a duplicate of any question that allows mutex locking in post(). Please read carefully, I need a lockfree post()! Don't mark this duplicate if you don't have a real answer.
A semaphore (like in linux) is a useful building block that is not found in the c++ standard, and neither in boost (currently). I'm mainly talking about semaphores between threads of a single process, over a preemptive scheduler.
I'm specifically interested in them being non-blocking (i.e. lockfree) unless it actually needs to block. That is, a post() and try_wait() should be always lockfree. And wait() calls should be lockfree if their invocations strongly-happen-after enough post()s have returned. Also a blocking wait() should be blocked by the scheduler rather than spin locked. What if I also want a wait_for with timeout - how much does it complicate the implementation further, while still avoiding starvation?
Are there reasons why semaphores are not in the standard?
Edit3: So, I wasn't aware that there's a proposal to the standard P0514R4 that exactly deals with these issues, and has solutions to all the problems raised here, apart from specifically adding an std::semaphore. http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0514r4.pdf
Also boost doesn't have these. Specifically, the ones in interprocess are spin locked.
What libraries support something like that?
Is it possible to implement it over windows api and other widespread systems?
edit: It is not possible to implement that lockfree using atomics+mutex+condition_variable - you either have to block in post or spin in wait. If you want a lockfree post(), you cannot lock a mutex in post(). I want to run on a possibly preemptive scheduler, and I don't want post() blocked by other threads that took the mutex and got preempted. So, this is not a duplicate of questions like C++0x has no semaphores? How to synchronize threads?
edit2: Following an example implementation just to demonstrate the best that can be done with atomics+mutex+condvar, AFAIK. post() and wait() perform one lockfree compare_exchange, and only if they must, they lock the mutex.
However post() is not lockfree. And worse, it might be blocked by a wait() that locked the mutex and got preempted.
For simplicity, I only implemented post_one() and wait_one_for(Duration), instead of post(int) and wait_for(int,Duration). Also I'm assuming no-spurious-wakeup which is not promised by the standard.
class semaphore //provides acquire release memory ordering for the user
{
private:
using mutex_t = std::mutex;
using unique_lock_t = std::unique_lock<mutex_t>;
using condvar_t = std::condition_variable;
using counter_t = int;
std::atomic<counter_t> atomic_count_;
mutex_t mutex_;
condvar_t condvar_;
counter_t posts_notified_pending_;
counter_t posts_unnotified_pending_;
counter_t waiters_running_;
counter_t waiters_aborted_pending_;
public:
void post_one()
{
counter_t start_count = atomic_count_.fetch_add(+1, mo_acq_rel);
if (start_count < 0) {
unique_lock_t lock(mutex_);
if (0 < waiters_running_) {
++posts_notified_pending_;
condvar_.notify_one();
}
else {
if (0 == waiters_aborted_pending_) {
++posts_unnotified_pending_;
}
else {
--waiters_aborted_pending_;
}
}
}
}
template< typename Duration >
bool wait_one_for(Duration timeout)
{
counter_t start_count = atomic_count_.fetch_add(-1, mo_acq_rel);
if (start_count <= 0) {
unique_lock_t a_lock(mutex_);
++waiters_running_;
BOOST_SCOPE_EXIT(&waiters_running_) {
--waiters_running_;
} BOOST_SCOPE_EXIT_END
if( ( 0 == posts_notified_pending_ ) && ( 0 < posts_unnotified_pending_ ) ) {
--posts_unnotified_pending_;
return true;
}
else {
auto wait_result = condvar_.wait_for( a_lock, timeout);
switch (wait_result) {
case std::cv_status::no_timeout: {
--posts_notified_pending_;
return true;
} break;
case std::cv_status::timeout: {
counter_t abort_count = atomic_count_.fetch_add(+1, mo_acq_rel);
if (abort_count >= 0) {
/*too many post() already increased a negative atomic_count_ and will try to notify, let them know we aborted. */
++waiters_aborted_pending_;
}
return false;
} break;
default: assert(false); return false;
}
}
}
return true;
}
bool try_wait_one()
{
counter_t count = atomic_count_.load( mo_acquire );
while (true) {
if (count <= 0) {
return false;
}
else if (atomic_count_.compare_exchange_weak(count, count-1, mo_acq_rel, mo_relaxed )) {
return true;
}
}
}
};
Yes, you can do this as long as your operating system offers a suitable "park" and "unpark" mechanism, which doesn't require taking a lock for unpark. Park refers to allowing a thread to go to sleep (OS blocking) and unpark refers to waking that thread.
You are already close with your atomic counter and condvar approach. The problem is that the condvar a mutex is required as part of the semantics. So you have to abandon condvars and go a bit lower level. First, you should pack all the state, such as the current semaphore value, whether there are any waiters (and possibly how many wwaiters), into a single atomic value, and manipulate this atomically with compare-and-exchange. This prevents races that would occur if you had these as separate values.
Then you can draw a state diagram showing all the possible states of the semaphore, with edges for all the possible transition states (e.g., the "no waiters" state would transition to "yes waiters" state when a waiter arrives). You implement all the transitions with compare-and-exchange, and whenever it fails you have to re-calculate the transition since it may have changed!
Then you only need to implement the blocking. On Windows you would use Events - either auto or manual reset. Both have their advantages and quirks, and there is more than one way to skin this cat. For example, you can probably get it to work with a single shared event and auto-reset events.
However, here's a sketch of one mechanism which use a per-thread waiter object in a lock free queue. The semaphore consists of an atomically manipulated control word, and a lock-free list with element type waiter_node
or stack or whatever off-the-shelf concurrent list-like thing you want to use.
We will assume that each thread owns a waiter_node object which just contains a single manual reset Event object. This could be created once and stored in TLS (probably the most efficient), or allocated on demand every time a wait needs to occur and de-allocated when the wait is done.
Here's the basic outline:
ResetEvent
on its waiter_node
and then pushes the event on the waiter list, checks that the sem value is still zero and then calls WaitForObject
on it's waiter_node
. When that returns, start the wait routine over from the top.waiter_node
, if any, and call SetEvent
on it.There are all sorts of "races" here, such as a waiter_node
being popped by a post
operation before the waiting thread even sleeps on it, but they should be benign.
There are many variants even on this waiter queue based design. For example, you may integrate the list "head" and the control word so they are the same thing. Then the wait
doesn't need to double check the semaphore count since the push operation verifies the semaphore state at the same time. You might also implement "direct handoff" where the post
ing thread doesn't increment the control word at all if there are waiters, but just pops one and wakes it with the information that it has successfully acquired the semaphore.
On Linux, you replace Event
with futex
. It is easier there to implement a "single futex" solution since futex
allows an atomic check-and-block operation inside the kernel that avoids a lot of the races inherent in the Event
solution. So a basic sketch is a single control word, and you make your transitions atomically with CAS, and then using a futex()
with FUTEX_WAIT
to do a second check of the control word and block atomically (this atomic check-and-sleep is the power of futex
).