Search code examples
c++multithreadingboostlambdathreadpool

Generic thread pool class is not working properly


I am trying to make a thread pool class which receives several functions and put them into the queue until they finish and then I can add another function to take advantage of the created threads instead of creating them when I want to run other functions. That's why I include a conditional variable to synchonize all the threads.

However, the code is not working properly because somehow when the function is invoked the object makes a copy. After several tries, I can not figure out what I am missing!

What I expect is that member function greetings of the hw object execute in parallel whit his index. But when the line (o.*f)(std::forward<Args>(args)...); is executed the object is copied, although the copy constructor is deleted. So when it enters into the greetings member it produces a SEGMENTATION FAULT.

CMakeLists.txt

cmake_minimum_required(VERSION 3.5)

project(boost_asyo LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_executable(boost_asyo main.cpp)
target_link_libraries(${PROJECT_NAME} boost_thread boost_system)

main.cpp

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>

#include <mutex>
#include <condition_variable>

class Semaphore
{
    std::mutex lock;

    std::condition_variable cond;

    int count;

public:

    Semaphore()
    {
        count = 0;
    }

    void wait()
    {
        std::unique_lock<std::mutex> m(lock);

        while(count > 0)
            cond.wait(m, [this]{ return count == 0; });
    }

    void take()
    {
        std::unique_lock m(lock);

        count++;
    }

    void give()
    {
        std::unique_lock m(lock);

        count--;

        if(count == 0)
        {
            cond.notify_one();
        }
    }
};


class ThreadPool
{
private:
    boost::asio::io_service m_io_service;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    boost::thread_group m_threads;
    Semaphore m_sem;

public:
    ThreadPool(size_t n)
    {
        this->m_work = std::make_unique<boost::asio::io_service::work>(m_io_service);

        for (size_t ii = 0; ii < n; ii++)
        {
            m_threads.create_thread(boost::bind(&boost::asio::io_service::run, &this->m_io_service));
        }
    }

    ThreadPool(const ThreadPool & v) = delete;
    ThreadPool(ThreadPool && v) = delete;

    ~ThreadPool()
    {
        m_io_service.stop();
    }

    template<class type, class T, class T1, class... Args>
    auto post(type T::*f, T1 &obj, Args... args)
    {
        this->m_sem.take();
        this->m_io_service.post([&] ()
        {
            T o = static_cast<T&&>(obj);

            (o.*f)(std::forward<Args>(args)...);
            this->m_sem.give();
        });

    }

    void wait()
    {
        this->m_sem.wait();
    }
};

class HelloWorld
{
private:

public:
    std::string m_str;
    HelloWorld(std::string str) : m_str(str) {};
    HelloWorld(const HelloWorld& v) = delete;
    HelloWorld(HelloWorld&& v) = default;

    ~HelloWorld() = default;

    void greetings(int ii)
    {
        for (int jj = 0; jj < 5; jj++)
        {
            std::cout << this->m_str << " " << ii <<  std::endl;

            boost::this_thread::sleep_for(boost::chrono::seconds(1));
        }

    }
};


int main()
{
    ThreadPool tp(8);

    HelloWorld hw("Hola mundo");

    for (int ii = 0; ii < 5; ii++)
    {
        tp.post(&HelloWorld::greetings, hw, ii);
    }

    tp.wait();

    return 0;
}

This code is based on this one, which it works properly, and this is something similar to what I want to do with classes and members.

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>

#include <mutex>
#include <condition_variable>

class Semaphore
{
    std::mutex lock;

    std::condition_variable cond;

    int count;

public:

    Semaphore()
    {
        count = 0;
    }

    void wait()
    {
        std::unique_lock<std::mutex> m(lock);

        while(count > 0)
            cond.wait(m, [this]{ return count == 0; });
    }

    void take()
    {
        std::unique_lock m(lock);

        count++;
    }

    void give()
    {
        std::unique_lock m(lock);

        count--;

        if(count == 0)
        {
            cond.notify_one();
        }
    }
};


int main()
{    
    boost::asio::io_service io_service;
    std::unique_ptr<boost::asio::io_service::work> work = std::make_unique<boost::asio::io_service::work>(io_service);

    boost::thread_group threads;

    for (size_t ii = 0; ii < 2; ii++)
    {
        std::cout << "id: " << ii << std::endl;
        threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
    }

    Semaphore sem;

    for (size_t ii = 0; ii < 3; ii++)
    {
        //Take
        sem.take();

        io_service.post([ii, &sem] ()
        {
            int id = 0;
            while(id < 5)
            {
                id++;
                printf("hello world %i\n", static_cast<int>(ii));
                boost::this_thread::sleep_for(boost::chrono::seconds(1));
            }

            //Give
            sem.give();
        });
    }


    sem.wait();


    for (size_t ii = 0; ii < 3; ii++)
    {
        sem.take();

        io_service.post([ii, &sem] ()
        {
            int id = 0;
            while(id < 5)
            {
                id++;
                printf("bye world %i\n", static_cast<int>(ii));
                boost::this_thread::sleep_for(boost::chrono::seconds(1));
            }
            sem.give();
        });
    }

    sem.wait();

    io_service.stop();

    return 0;
}



Solution

  • I'm really curious what the semaphore is about.

    io_service is already a task queue. It's thread safe and you don't need a semaphore.

    For comparison, here's io_service based thread pool:

    (Even better, recent Asio versions have a built-in thread-pool).

    Where Is The Error

    This is unsafe:

    template <class type, class T, class T1, class... Args>
    auto post(type T::*f, T1& obj, Args... args) {
        this->m_sem.take();
        this->m_io_service.post([&]() {
            T o = static_cast<T&&>(obj);
            (o.*f)(std::forward<Args>(args)...);
            this->m_sem.give();
        });
    }
    

    Specifically:

    1. The line

      T o = static_cast<T&&>(obj);
      

      doesn't copy T (which is HelloWorld). You knew that because that wouldn't be possible. What happens is WORSE: the object is MOVED from obj.

      Incidentally, this assumes that T is move-constructible from T1.

      You specifically ask for it by explicitly casting the right-handside to an rvalue-reference.

      This is what std::move is specified to do, actually: "In particular, std::move produces an xvalue expression that identifies its argument t. It is exactly equivalent to a static_cast to an rvalue reference type."

      The effect is that the HelloWorld instance in main is no longer valid, yet you keep moving from it for subsequent tasks.

    2. The other arguments captured by reference. This means they're out of scope before the task actually executes (including f).

    To make this safe, you have had to capture the arguments in local copies:

    template <class type, class T, class... Args>
    auto post(type T::*f, T&& obj, Args... args) {
        this->m_sem.take();
        this->m_io_service.post([=, o = std::move(obj)]() mutable {
            try {
                (o.*f)(args...);
            } catch (...) {
                this->m_sem.give();
                throw;
            }
            this->m_sem.give();
        });
    }
    

    Notes:

    1. now obj is taken by rvalue reference. This means that post won't compile unless obj is an rvalue.

      Note this is not a universal reference because T is deduced as part of f.

    2. the lambda is now mutable (because otherwise only const member functions could be run on the captured o)

    3. all other args are copied - this is roughly how std::bind would operate, but you could optimize for movable arguments).

    4. We handle exceptions - in your code if f threw, you would never give() the semaphore

    Of course, main needs to adapt so multiple HelloWorld instances are actually created and passed by rvalue:

    for (int ii = 0; ii < 5; ii++) {
        HelloWorld hw("Hola mundo");
        tp.post(&HelloWorld::greetings, std::move(hw), ii);
    }
    

    BUT - IT WON'T WORK

    At least, for me it doesn't compile. Asio requires handlers to be copyable (why must a Boost.Asio handler be copy constructible?, How to trick boost::asio to allow move-only handlers).

    Also, we hardly scratched the surface. By hardcoding for type T::*f you made it so that you need new post overloads for many things: static methods, const member functions ...

    Instead, why not do it the C++ way:

    template <class F, class... Args>
    auto post(F&& f, Args&&... args) {
        this->m_sem.take();
        this->m_io_service.post(
            [this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)] 
            {
                try { f(); }
                catch (...) {
                    this->m_sem.give();
                    throw;
                }
                this->m_sem.give();
            });
    }
    

    Actually, in more modern C++ you'd write (assuming c++17 here):

        //...
            [this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] 
            {
                try { std::apply(f, args); }
        //...
    

    Oh, and we still need

    #define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1
    

    because of the move-only handler type

    Full Fixed Version Demo

    NOTE: Also added an output mutex (s_outputmx) to avoid intermixed console output.

    Live On Coliru

    #define BOOST_ASIO_DISABLE_HANDLER_TYPE_REQUIREMENTS 1
    #include <iostream>
    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    #include <memory>
    
    #include <mutex>
    #include <condition_variable>
    
    class Semaphore {
        std::mutex lock;
        std::condition_variable cond;
        int count;
      public:
        Semaphore() { count = 0; }
        void wait() {
            std::unique_lock<std::mutex> m(lock);
            while (count > 0)
                cond.wait(m, [this] { return count == 0; });
        }
        void take() {
            std::unique_lock m(lock);
            count++;
        }
        void give() {
            std::unique_lock m(lock);
            count--;
            if (count == 0) {
                cond.notify_one();
            }
        }
    };
    
    
    class ThreadPool {
      private:
        boost::asio::io_service m_io_service;
        std::unique_ptr<boost::asio::io_service::work> m_work;
        boost::thread_group m_threads;
        Semaphore m_sem;
    
      public:
        ThreadPool(size_t n) {
            this->m_work =
                std::make_unique<boost::asio::io_service::work>(m_io_service);
            for (size_t ii = 0; ii < n; ii++) {
                m_threads.create_thread(boost::bind(&boost::asio::io_service::run,
                                                    &this->m_io_service));
            }
        }
        ThreadPool(const ThreadPool& v) = delete;
        ThreadPool(ThreadPool&& v) = delete;
        ~ThreadPool() { m_io_service.stop(); }
    
        template <class F, class... Args>
        auto post(F&& f, Args&&... args) {
            this->m_sem.take();
            this->m_io_service.post(
    #if 1 // pre-c++17
                [this, f=std::bind(std::forward<F>(f), std::forward<Args>(args)...)] 
                {
                    try { f(); }
    #else // https://en.cppreference.com/w/cpp/utility/apply
                [this, f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] 
                {
                    try { std::apply(f, args); }
    #endif
                    catch (...) {
                        this->m_sem.give();
                        throw;
                    }
                    this->m_sem.give();
                });
        }
    
    
        void wait() { this->m_sem.wait(); }
    };
    
    struct HelloWorld {
        std::string m_str;
    
        HelloWorld(std::string str) : m_str(str){};
        HelloWorld(const HelloWorld& v) = delete;
        HelloWorld(HelloWorld&& v) = default;
        ~HelloWorld() = default;
    
        void greetings(int ii) const {
            for (int jj = 0; jj < 5; jj++) {
                {
                    static std::mutex s_outputmx;
                    std::lock_guard<std::mutex> lk(s_outputmx);
                    std::cout << this->m_str << " " << ii << std::endl;
                }
                boost::this_thread::sleep_for(boost::chrono::seconds(1));
            }
        }
    };
    
    int main()
    {
        ThreadPool tp(8);
    
        for (int ii = 0; ii < 5; ii++) {
            HelloWorld hw("Hola mundo");
            tp.post(&HelloWorld::greetings, std::move(hw), ii);
        }
    
        tp.wait();
    }
    

    Prints

    Hola mundo 0
    Hola mundo 2
    Hola mundo 3
    Hola mundo 1
    Hola mundo 4
    Hola mundo 0
    Hola mundo 1
    Hola mundo 4
    Hola mundo 2
    Hola mundo 3
    Hola mundo 0
    Hola mundo 1
    Hola mundo 4
    Hola mundo 2
    Hola mundo 3
    Hola mundo 0
    Hola mundo 4
    Hola mundo 2
    Hola mundo 3
    Hola mundo 1
    Hola mundo 0
    Hola mundo 4
    Hola mundo 2
    Hola mundo 1
    Hola mundo 3
    

    BONUS: Drop semaphore

    Dropping the semaphore and actually using work:

    class ThreadPool {
        boost::asio::io_service m_io_service;
        std::unique_ptr<boost::asio::io_service::work> m_work;
        boost::thread_group m_threads;
    
      public:
        ThreadPool(size_t n)
            : m_work(std::make_unique<boost::asio::io_service::work>(m_io_service))
        {
            while (n--) {
                m_threads.create_thread([this] { m_io_service.run(); });
            }
        }
    
        ~ThreadPool() { wait(); }
    
        void wait() {
            m_work.reset();
            m_threads.join_all();
        }
    
        template <class F, class... Args> void post(F&& f, Args&&... args) {
            m_io_service.post(
                [f=std::forward<F>(f), args=std::make_tuple(std::forward<Args>(args)...)] {
                    std::apply(f, args); 
                });
        }
    };
    

    That's 28 lines of code, compared to 90 lines in your original. And it actually does more things.

    See it Live On Coliru as well.

    What's Left?

    We didn't handle exceptions from io_service::run properly (see Should the exception thrown by boost::asio::io_service::run() be caught?)

    Also, if you have "recent" Boost, you can enjoy an improved interface to work (make_work_guard and .reset() so you don't need unique_ptr), and a ready-made thread_pool (so you don't need ... basically anything anymore):

    Live On Coliru

    #include <boost/asio.hpp>
    #include <mutex>
    #include <iostream>
    static std::mutex s_outputmx;
    using namespace std::chrono_literals;
    
    struct HelloWorld {
        std::string const m_str;
        void greetings(int ii) const;
    };
    
    int main() {
        boost::asio::thread_pool tp(8);
    
        for (int ii = 0; ii < 5; ii++)
            //post(tp, [hw=HelloWorld{"Hola mundo"}, ii] { hw.greetings(ii); });
            post(tp, std::bind(&HelloWorld::greetings, HelloWorld{"Hola mundo"}, ii));
    
        tp.join();
    }
    
    void HelloWorld::greetings(int ii) const {
        for (int jj = 0; jj < 5; jj++) {
            std::this_thread::sleep_for(1s);
    
            std::lock_guard<std::mutex> lk(s_outputmx);
            std::cout << m_str << " " << ii << std::endl;
        }
    }