Hey there!
I have already looked up similiar problems here, but didn't arrive at a final solution.
In my application, I have two processes running concurrently that need to be synchronized via shared-memory. Currently I'm using a pthread_mutex_t
& pthread_cond_t
for that purpose which are put into shared-memory.
This is working fine until process A crashes whilst waiting on the condition. If process A is restarted a deadlock(?) happens in which process A waits on the condition and process B is stuck indefinetly at the call to pthread_cond_broadcast
.
I read that this may be due to the mutex being in an inconsistent state, but in fact it never seems to be in my program. I would appreciate if you can tell me if I misunderstood something or if there are alternative approaches on solving this or if it isn't possible at all to guard this case of crashing.
struct Poller
{
private:
std::atomic<bool> waiting;
pthread_mutex_t mtx;
pthread_cond_t cnd;
auto lockMutex() -> void
{
// Recover Mutex if any process locking it died unexpectedly
LOG_DEBUG(info, "AdaptivePoller", "Locking mutex...");
if(pthread_mutex_lock(&mtx) == EOWNERDEAD) {
LOG_DEBUG(info, "AdaptivePoller", "Mutex in inconsistent state.");
if(pthread_mutex_consistent(&mtx) != 0) {
throw std::runtime_error("Mutex could not be brought back into consistent state.");
}
}
}
public:
Poller()
{
waiting = false;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&mtx, &attr);
pthread_mutexattr_destroy(&attr);
pthread_condattr_t attrcond;
pthread_condattr_init(&attrcond);
pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&cnd, &attrcond);
pthread_condattr_destroy(&attrcond);
}
auto wait() -> void
{
lockMutex();
waiting = true;
LOG_DEBUG(info, "AdaptivePoller", "Start waiting...");
while(waiting) {
pthread_cond_wait(&cnd, &mtx);
}
pthread_mutex_unlock(&mtx);
}
auto notify() -> void
{
lockMutex();
waiting = false;
LOG_DEBUG(info, "AdaptivePoller", "Notifying...");
pthread_cond_broadcast(&cnd);
pthread_mutex_unlock(&mtx);
}
};
As far as I am aware, the POSIX API does not define mechanisms sufficient to make your Poller
wholly robust against all process-failure scenarios. But you might achieve a situation you consider an improvement by using a SysV semaphore in place of the condition variable and mutex. Here's an outline:
a semaphore can_proceed
provides for threads to block themselves until released. This semaphore has initial value 0, and threads block themselves by attempting to decrement it.
a thread releases the currently-blocked processes by incrementing can_proceed
by the number of blocked processes. The SysV semaphore API provides an operation for determining how many that is.
The above is not infallible. At least these cases can occur:
A waiting process W dies after a notifying process N reads a waiter count that includes W, but before W completes its semaphore decrement. This will allow one future waiter to pass through without blocking.
A waiting process W2 arrives in wait()
between when a notifying process has incremented can_proceed
and when the last of the previously-waiting processes completes its semaphore decrement. It is possible in this case that W2 proceeds immediately instead of some thread W1 that was already waiting at the time of the notification. In this case, W1 would be released by the next notification (unless the same thing happened to it again).
If two processes attempt to notify at about the same time then one could observe a waiter count that reflected processes that had already been released, but had not yet completed their semaphore decrement. The likely outcome is that one or more future processes would pass through wait()
without blocking.
Inasmuch as two of those are analogous to spurious return from a CV wait, they could be addressed via a similar predicate-checking idiom. That would be tricky given the fact that there is necessarily a gap between a process completing its semaphore decrement and it acquiring a mutex, but perhaps that could be worked around by using an atomic object, so that no mutex is required.
It is also possible that a process dies in notify()
before incrementing can_proceed
, such that that notification is ineffective. I don't account that a weakness specific to the suggested scheme.
Note that SysV semaphores do not themselves reside in or require shared memory.