Search code examples
c++c++11signalssignal-handling

Shutting down a multithreaded application by installing a signal handler


In the following code, I create a toy class that has a thread which writes to a queue while the other thread reads from that queue and prints it to stdout. Now, in order to cleanly shutdown the system, I setup a handler for SIGINT. I am expecting the signal handler to set up the std::atomic<bool> variable stopFlag, which will lead threadB to push a poison pill (sentinel) on to the queue encountering which threadA will halt.

class TestClass
{
public:

    TestClass();
    ~TestClass();
    void shutDown();

    TestClass(const TestClass&) = delete;
    TestClass& operator=(const TestClass&) = delete;


private:
    void init();
    void postResults();
    std::string getResult();
    void processResults();

    std::atomic<bool> stopFlag;

    std::mutex outQueueMutex;
    std::condition_variable outQueueConditionVariable;
    std::queue<std::string> outQueue;

    std::unique_ptr<std::thread> threadA;
    std::unique_ptr<std::thread> threadB;
};

void TestClass::init()
{
    threadA = std::make_unique<std::thread>(&TestClass::processResults, std::ref(*this));
    threadB = std::make_unique<std::thread>(&TestClass::postResults, std::ref(*this));
}

TestClass::TestClass():
    stopFlag(false)
{
    init();
}

TestClass::~TestClass()
{
    threadB->join();
}

void TestClass::postResults()
{
    while(true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        std::string name = "ABCDEF";
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            outQueue.push(name);
            outQueueConditionVariable.notify_one();
        }
        if(stopFlag)
        {
            /*For shutting down output thread*/
            auto poisonPill = std::string();
            {
                std::unique_lock<std::mutex> lock(outQueueMutex);
                outQueue.push(poisonPill);
                outQueueConditionVariable.notify_one();
            }
            threadA->join();
            break;
        }
    }
}

void TestClass::shutDown()
{
    stopFlag = true;
}

std::string TestClass::getResult()
{
    std::string result;
    {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        while(outQueue.empty())
        {
            outQueueConditionVariable.wait(lock);
        }
        result= outQueue.front();
        outQueue.pop();
    }
    return result;
}

void TestClass::processResults()
{
    while(true)
    {
        const auto result = getResult();

        if(result.empty())
        {
            break;
        }

        std::cout << result << std::endl;

    }
}

static void sigIntHandler(std::shared_ptr<TestClass> t, int)
{
    t->shutDown();
}
static std::function<void(int)> handler;

int main()
{
    auto testClass = std::make_shared<TestClass>();
    handler = std::bind(sigIntHandler, testClass, std::placeholders::_1);
    std::signal(SIGINT, [](int n){ handler(n);});
    return 0;
}

I compiled this using gcc 5.2 using the -std=c++14 flag. On hitting Ctrl-C on my CentOS 7 machine, I get the following error,

terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument
Aborted (core dumped)

Please help me understand what is going on.


Solution

  • What happens is that your main function exits immediately destroying global handler object and then testClass. Then the main thread gets blocked in TestClass::~TestClass. The signal handler ends up accessing already destroyed objects, which leads to the undefined behaviour.

    The root cause is undefined object ownership due to shared pointers - you do not know what and when ends up destroying your objects.


    A more general approach is to use another thread to handle all signals and block signals in all other threads. That signal handling thread then can call any functions upon receiving a signal.

    You also do not need the smart pointers and function wrappers here at all.

    Example:

    class TestClass
    {
    public:
        TestClass();
        ~TestClass();
        void shutDown();
    
        TestClass(const TestClass&) = delete;
        TestClass& operator=(const TestClass&) = delete;
    
    private:
        void postResults();
        std::string getResult();
        void processResults();
    
    
        std::mutex outQueueMutex;
        std::condition_variable outQueueConditionVariable;
        std::queue<std::string> outQueue;
        bool stop = false;
    
        std::thread threadA;
        std::thread threadB;
    };
    
    TestClass::TestClass()
        : threadA(std::thread(&TestClass::processResults, this))
        , threadB(std::thread(&TestClass::postResults, this))
    {}
    
    TestClass::~TestClass() {
        threadA.join();
        threadB.join();
    }
    
    void TestClass::postResults() {
        while(true) {
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
            std::string name = "ABCDEF";
            {
                std::unique_lock<std::mutex> lock(outQueueMutex);
                if(stop)
                    return;
                outQueue.push(name);
                outQueueConditionVariable.notify_one();
            }
        }
    }
    
    void TestClass::shutDown() {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        stop = true;
        outQueueConditionVariable.notify_one();
    }
    
    std::string TestClass::getResult() {
        std::string result;
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            while(!stop && outQueue.empty())
                outQueueConditionVariable.wait(lock);
            if(stop)
                return result;
            result= outQueue.front();
            outQueue.pop();
        }
        return result;
    }
    
    void TestClass::processResults()
    {
        while(true) {
            const auto result = getResult();
            if(result.empty())
                break;
            std::cout << result << std::endl;
        }
    }
    
    int main() {
        // Block signals in all threads.
        sigset_t sigset;
        sigfillset(&sigset);
        ::pthread_sigmask(SIG_BLOCK, &sigset, nullptr);
    
        TestClass testClass;
    
        std::thread signal_thread([&testClass]() {
            // Unblock signals in this thread only.
            sigset_t sigset;
            sigfillset(&sigset);
            int signo = ::sigwaitinfo(&sigset, nullptr);
            if(-1 == signo)
                std::abort();
    
            std::cout << "Received signal " << signo << '\n';
            testClass.shutDown();
        });
    
        signal_thread.join();
    }