In the following code, I create a toy class that has a thread which writes to a queue while the other thread reads from that queue and prints it to stdout
. Now, in order to cleanly shutdown the system, I setup a handler for SIGINT
. I am expecting the signal handler to set up the std::atomic<bool>
variable stopFlag
, which will lead threadB
to push a poison pill (sentinel) on to the queue encountering which threadA
will halt.
class TestClass
{
public:
TestClass();
~TestClass();
void shutDown();
TestClass(const TestClass&) = delete;
TestClass& operator=(const TestClass&) = delete;
private:
void init();
void postResults();
std::string getResult();
void processResults();
std::atomic<bool> stopFlag;
std::mutex outQueueMutex;
std::condition_variable outQueueConditionVariable;
std::queue<std::string> outQueue;
std::unique_ptr<std::thread> threadA;
std::unique_ptr<std::thread> threadB;
};
void TestClass::init()
{
threadA = std::make_unique<std::thread>(&TestClass::processResults, std::ref(*this));
threadB = std::make_unique<std::thread>(&TestClass::postResults, std::ref(*this));
}
TestClass::TestClass():
stopFlag(false)
{
init();
}
TestClass::~TestClass()
{
threadB->join();
}
void TestClass::postResults()
{
while(true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::string name = "ABCDEF";
{
std::unique_lock<std::mutex> lock(outQueueMutex);
outQueue.push(name);
outQueueConditionVariable.notify_one();
}
if(stopFlag)
{
/*For shutting down output thread*/
auto poisonPill = std::string();
{
std::unique_lock<std::mutex> lock(outQueueMutex);
outQueue.push(poisonPill);
outQueueConditionVariable.notify_one();
}
threadA->join();
break;
}
}
}
void TestClass::shutDown()
{
stopFlag = true;
}
std::string TestClass::getResult()
{
std::string result;
{
std::unique_lock<std::mutex> lock(outQueueMutex);
while(outQueue.empty())
{
outQueueConditionVariable.wait(lock);
}
result= outQueue.front();
outQueue.pop();
}
return result;
}
void TestClass::processResults()
{
while(true)
{
const auto result = getResult();
if(result.empty())
{
break;
}
std::cout << result << std::endl;
}
}
static void sigIntHandler(std::shared_ptr<TestClass> t, int)
{
t->shutDown();
}
static std::function<void(int)> handler;
int main()
{
auto testClass = std::make_shared<TestClass>();
handler = std::bind(sigIntHandler, testClass, std::placeholders::_1);
std::signal(SIGINT, [](int n){ handler(n);});
return 0;
}
I compiled this using gcc 5.2 using the -std=c++14 flag. On hitting Ctrl-C on my CentOS 7 machine, I get the following error,
terminate called after throwing an instance of 'std::system_error'
what(): Invalid argument
Aborted (core dumped)
Please help me understand what is going on.
What happens is that your main
function exits immediately destroying global handler
object and then testClass
. Then the main thread gets blocked in TestClass::~TestClass
. The signal handler ends up accessing already destroyed objects, which leads to the undefined behaviour.
The root cause is undefined object ownership due to shared pointers - you do not know what and when ends up destroying your objects.
A more general approach is to use another thread to handle all signals and block signals in all other threads. That signal handling thread then can call any functions upon receiving a signal.
You also do not need the smart pointers and function wrappers here at all.
Example:
class TestClass
{
public:
TestClass();
~TestClass();
void shutDown();
TestClass(const TestClass&) = delete;
TestClass& operator=(const TestClass&) = delete;
private:
void postResults();
std::string getResult();
void processResults();
std::mutex outQueueMutex;
std::condition_variable outQueueConditionVariable;
std::queue<std::string> outQueue;
bool stop = false;
std::thread threadA;
std::thread threadB;
};
TestClass::TestClass()
: threadA(std::thread(&TestClass::processResults, this))
, threadB(std::thread(&TestClass::postResults, this))
{}
TestClass::~TestClass() {
threadA.join();
threadB.join();
}
void TestClass::postResults() {
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::string name = "ABCDEF";
{
std::unique_lock<std::mutex> lock(outQueueMutex);
if(stop)
return;
outQueue.push(name);
outQueueConditionVariable.notify_one();
}
}
}
void TestClass::shutDown() {
std::unique_lock<std::mutex> lock(outQueueMutex);
stop = true;
outQueueConditionVariable.notify_one();
}
std::string TestClass::getResult() {
std::string result;
{
std::unique_lock<std::mutex> lock(outQueueMutex);
while(!stop && outQueue.empty())
outQueueConditionVariable.wait(lock);
if(stop)
return result;
result= outQueue.front();
outQueue.pop();
}
return result;
}
void TestClass::processResults()
{
while(true) {
const auto result = getResult();
if(result.empty())
break;
std::cout << result << std::endl;
}
}
int main() {
// Block signals in all threads.
sigset_t sigset;
sigfillset(&sigset);
::pthread_sigmask(SIG_BLOCK, &sigset, nullptr);
TestClass testClass;
std::thread signal_thread([&testClass]() {
// Unblock signals in this thread only.
sigset_t sigset;
sigfillset(&sigset);
int signo = ::sigwaitinfo(&sigset, nullptr);
if(-1 == signo)
std::abort();
std::cout << "Received signal " << signo << '\n';
testClass.shutDown();
});
signal_thread.join();
}