I have been working on a idea for a system where I can have many workers that are triggered on a regular basis by a a central timer class. The part I'm concerned about here is a TriggeredWorker
which, in a loop, uses the mutex
& conditionVariable
approach to wait to be told to do work. It has a method trigger
that is called (by a different thread) that triggers work to be done. It is an abstract class that has to be subclassed for the actual work
method to be implemented.
I have a test that shows that this mechanism works. However, as I increase the load by reducing the trigger interval, the test starts to fail. When I delay 20 microseconds between triggers, the test is 100% reliable. As I reduce down to 1 microsecond, I start to get failures in that the count of work performed reduces from 1000 (expected) to values like 986, 933, 999 etc..
My questions are: (1) what is it that is going wrong and how can I capture what is going wrong so I can report it or do something about it? And, (2) is there some better approach that I could use that would be better? I have to admit that my experience with c++ is limited to the last 3 months, although I have worked with other languages for several years.
Many thanks for reading...
Here are the key bits of code:
Triggered worker header file:
#ifndef TIMER_TRIGGERED_WORKER_H
#define TIMER_TRIGGERED_WORKER_H
#include <thread>
#include <plog/Log.h>
class TriggeredWorker {
private:
std::mutex mutex_;
std::condition_variable condVar_;
std::atomic<bool> running_{false};
std::atomic<bool> ready_{false};
void workLoop();
protected:
virtual void work() {};
public:
void start();
void stop();
void trigger();
};
#endif //TIMER_TRIGGERED_WORKER_H
Triggered worker implementation:
#include "TriggeredWorker.h"
void TriggeredWorker::workLoop() {
PLOGD << "workLoop started...";
while(true) {
std::unique_lock<std::mutex> lock(mutex_);
condVar_.wait(lock, [this]{
bool ready = this->ready_;
bool running = this->running_;
return ready | !running; });
this->ready_ = false;
if (!this->running_) {
break;
}
PLOGD << "Calling work()...";
work();
lock.unlock();
condVar_.notify_one();
}
PLOGD << "Worker thread completed.";
}
void TriggeredWorker::start() {
PLOGD << "Worker start...";
this->running_ = true;
auto thread = std::thread(&TriggeredWorker::workLoop, this);
thread.detach();
}
void TriggeredWorker::stop() {
PLOGD << "Worker stop.";
this->running_ = false;
}
void TriggeredWorker::trigger() {
PLOGD << "Trigger.";
std::unique_lock<std::mutex> lock(mutex_);
ready_ = true;
lock.unlock();
condVar_.notify_one();
}
and the test:
#include "catch.hpp"
#include "TriggeredWorker.h"
#include <thread>
TEST_CASE("Simple worker performs work when triggered") {
static std::atomic<int> twt_count{0};
class SimpleTriggeredWorker : public TriggeredWorker {
protected:
void work() override {
PLOGD << "Incrementing counter.";
twt_count.fetch_add(1);
}
};
SimpleTriggeredWorker worker;
worker.start();
for (int i = 0; i < 1000; i++) {
worker.trigger();
std::this_thread::sleep_for(std::chrono::microseconds(20));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
CHECK(twt_count == 1000);
std::this_thread::sleep_for(std::chrono::seconds(1));
worker.stop();
}
What happens when worker.trigger()
is called twice before workLoop
acquires the lock? You loose one of those "triggers". Smaller time gap means higher probability of test failure, because of higher probability of multiple consecutive worker.trigger()
calls before workLoop
wakes up. Note that there's nothing that guarantees that workLoop
will acquire the lock after worker.trigger()
but before another worker.trigger()
happens, even when those calls happen one after another (i.e. not in parallel). This is governed by the OS scheduler and we have no control over it.
Anyway the core problem is that setting ready_ = true
twice looses information. Unlike incrementing an integer twice. And so the simplest solution is to replace bool
with int
and do inc/dec with == 0
checks. This solution is also known as semaphore. More advanced (potentially better, especially when you need to pass some data to the worker) approach is to use a (bounded?) thread safe queue. That depends on what exactly you are trying to achieve.
BTW 1: all your reads and updates, except for stop()
function (and start()
but this isn't really relevant), happen under the lock. I suggest you fix stop()
to be under lock as well (since it is rarely called anyway) and turn atomics into non-atomics. There's an unnecessary overhead of atomics at the moment.
BTW 2: I suggest not using thread.detach()
. You should store the std::thread
object on TriggeredWorker
and add destructor that does stop
with join
. These are not independent beings and so without detach()
you make your code safer (one should never die without the other).