Search code examples
c++linuxmultithreadingsignalslibev

Libev c++ can't stop dynamic loops in multithread application


I write multithread server with c++ and libev and has one problem. I start default_loop in main thread and create and start dynamic_loop in every slave threads. When I handle signals to stop process I call break_loop(ev::ALL). After that default event loop stopped but all dynamic loops work, need help to stop dynamic loops.

Environment: OS: Opensuse 13.1

compiler: g++ (SUSE Linux) 4.8.1 20130909

libev: 4.15

source:

#include <vector>
#include <memory>
#include <ev++.h>
#include <thread>
#include <iostream>

class Thread 
{
public:
    typedef std::thread thread_t;
    Thread() {std::cout << "thread created" << std::endl;}
    ~Thread() {std::cout << "Thread destructed" << std::endl;}
    virtual void start() {this->thread = thread_t(&Thread::run, this);}
    virtual void stop()
    {
        std::cout << "unloop" << std::endl;
        std::cout << this->thread.get_id() << std::endl;

        if(this->thread.joinable())
            std::cout << " it's joinable" << std::endl;

        this->loop->break_loop(ev::ALL);

        this->thread.join();
        std::cout << "Thread " << this->thread.get_id() << " stopped" << std::endl;
    }

    void run()
    {
        this->loop = new ev::dynamic_loop(ev::AUTO);

        this->timer.set(*this->loop);
        this->timer.set<Thread, &Thread::on_timer>(this);
        this->timer.start(1, 1);

        std::cout << "Thread " << this->thread.get_id() << " started" << std::endl;

        this->loop->run(0);

        this->timer.stop();

        std::cout << "Thread " << this->thread.get_id() << " finalized" << std::endl;
    }

    void on_timer()
    {
        std::cout << "Time event" << std::endl;
    }
protected:
    thread_t thread;
    ev::dynamic_loop *loop;
    ev::timer timer;
private:
    Thread(const Thread &) = delete;
    void operator=(const Thread &) = delete;
};

class Server {
public:
    void run()
    {
        std::cout << "Start server" << std::endl;

        this->sigint.set<&Server::on_terminate_signal>();
        this->sigint.start(SIGINT);

        this->sigterm.set<&Server::on_terminate_signal>();
        this->sigterm.start(SIGTERM);

        this->sigkill.set<Server::on_terminate_signal>();
        this->sigkill.start(SIGKILL);

        std::vector<Thread*> threads;

        for(auto i = 0; i< 5; i++)
        {
            auto buf = new Thread();
            buf->start();
            threads.push_back(buf);
        }

        this->loop.run(0);

        this->sigint.stop();
        this->sigterm.stop();
        this->sigkill.stop();

        for(auto *iter: threads)
        {
            iter->stop();
            delete (&iter);
        }

        std::cout << "Gracefull exit" << std::endl;
    }
protected:
    ev::default_loop loop;
    ev::sig sigint;
    ev::sig sigkill;
    ev::sig sigterm;

    static void on_terminate_signal(ev::sig& signal, int)
    {
        signal.loop.break_loop(ev::ALL);
        signal.stop();
        std::cout << "Signal handled" << std::endl;
    }    
};

int main(int, char**)
{
    std::cout << "libev " << ev::version_major() << "." << ev::version_minor() << std::endl;
    Server server;
    server.run();

    return 0;
}

Output:

  libev 4.15
    Start server
    thread created
    thread created
    thread created
    thread created
    thread created
    Thread 139639372617472 started
    Thread 139639326365440 started
    Thread 139639337928448 started
    Thread 139639361054464 started
    Thread 139639349491456 started
    Time event
    Time event
    Time event
    Time event
    Time event
    Time event
    Time event
    Time eventTime event

    Time event
    Time event
    Time event
    Time event
    Time event
    Time event
    ^CSignal handled
    unloop
    139639372617472
     it's joinable
    Time eventTime event

    Time event
    Time event
    Time event
    Time event
    Time event
    Time event
    Time event

    Time event

compilation flags: -O0 -g -std=c++11

linker flags: -lev

UPD, source with wok code:

    #include <vector>
    #include <memory>
    #include <ev++.h>
    #include <thread>
    #include <iostream>

    class Thread {
    public:
            typedef std::thread thread_t;
            Thread(): loop(ev::AUTO) {std::cout << "thread created" << std::endl;}
            virtual ~Thread() {std::cout << "Thread destructed" << std::endl;}
            virtual void start() {this->thread = thread_t(&Thread::run, this);}
            virtual void stop()
            {
                    std::cout << "Try stopping " << this->thread.get_id() << std::endl;

                    this->stopper.send();
                    this->thread.join();
            }

            void run()
            {
                    this->timer.set(this->loop);
                    this->timer.set<Thread, &Thread::on_timer>(this);
                    this->timer.start(1, 1);

                    this->stopper.set(this->loop);
                    this->stopper.set<Thread, &Thread::on_stop>(this);
                    this->stopper.start();

                    std::cout << "Thread " << this->thread.get_id() << " started" << std::endl;

                    this->loop.run(0);

                    std::cout << "Thread " << this->thread.get_id() << " finalized" << std::endl;
            }
    protected:
            thread_t thread;
            ev::dynamic_loop loop;
            ev::timer timer;
            ev::async stopper;

            void on_timer()
            {
                    std::cout << "Time event" << std::endl;
            }

            void on_stop()
            {
                    std::cout << "On stop event " << std::endl;

                    this->stopper.stop();
                    this->timer.stop();

                    this->loop.break_loop(ev::ALL);
            }
    private:
            Thread(const Thread &) = delete;
            void operator=(const Thread &) = delete;
    };

    class Server {
    public:
            void run()
            {
                    std::cout << "Start server" << std::endl;

                    this->sigint.set<&Server::on_terminate_signal>();
                    this->sigint.start(SIGINT);

                    this->sigterm.set<&Server::on_terminate_signal>();
                    this->sigterm.start(SIGTERM);

                    this->sigkill.set<Server::on_terminate_signal>();
                    this->sigkill.start(SIGKILL);

                    std::vector<Thread*> threads;

                    for(auto i = 0; i< 5; i++)
                    {
                            auto buf = new Thread();
                            buf->start();
                            threads.push_back(buf);
                    }

                    this->loop.run(0);

                    this->sigint.stop();
                    this->sigterm.stop();
                    this->sigkill.stop();

                    for (auto &iter: threads)
                    {
                            iter->stop();
                            delete iter;
                    }

                    std::cout << "Gracefull exit" << std::endl;
            }
    protected:
            ev::default_loop loop;
            ev::sig sigint;
            ev::sig sigkill;
            ev::sig sigterm;

            static void on_terminate_signal(ev::sig& signal, int)
            {
                    signal.loop.break_loop(ev::ALL);
                    signal.stop();
                    std::cout << "Signal handled" << std::endl;
            }
    };

    int main(int, char**)
    {
            std::cout << "libev " << ev::version_major() << "." << ev::version_minor() << std::endl;
            Server server;
            server.run();

            return 0;
    }

Solution

  • Answer

    You need to stop timer before stop a loop, that contains this timer:

    move line:

    this->timer.stop();

    before this:

    this->loop->break_loop(ev::ALL);

    Dont play with raw pointer

    You can get rid of the pointer:

    ev::dynamic_loop *loop;

    this->loop = new ev::dynamic_loop(ev::AUTO);

    And make it class variable, initialized in constructor:

    ev::dynamic_loop loop;

    Thread() : loop(ev::AUTO)

    Probably this is a mistake

    delete (&iter);

    Of course, if you do not want to remove pointer to pointer to iter.