Search code examples
c++boostthreadpoolasio

Interrupt all threads in the Boost Asio Thread Pool


I commonly use boost::thread to run threads. I need threads to be interrupted easely, so I use boost::this_thread::interruption_point(). The code looks like this:

void do_long_calculations()
{
  for (...)
  {
    boost::this_thread::interruption_point();
    do_some_work();
  }
}

auto t = boost::thread(do_long_calculations);
...
t.interrupt();
t.join();

Now I need a thread pool and I try to use boost::asio::thread_pool. It seems it does not have a standard way to interrupt such threads. So, how can I interrupt all running threads in the pool? I need to interrupt all running jobs before destroying thread pool.

Is it possoble to make interruption by myself? Something like this (or by any other way):

boost::asio::thread_pool p;
std::set<boost::thread::id> thread_ids;
post(p, []()
{
  thread_ids.insert(boost::this_thread::get_id());
  do_long_calculations();
});
...
for (auto id : thread_ids)
  SOME_INTERRUPTION_FUNCTION(id); // fire an event for boost::this_thread::interruption_point();
p.stop();

This way does not work:

boost::asio::thread_pool p;
std::set<boost::detail::thread_data_ptr> threads;
post(p, []()
{
  if (auto d = boost::detail::get_current_thread_data())
    threads.insert(d); // NEVER GOT HERE
  do_long_calculations();
});
...
for (auto &d : threads)
  d->interrupt();
p.stop();

Or may be I need to use some other thread_pool-compatible interruption checking instead/nearby of boost::this_thread::interruption_point() call?


Solution

  • I think in reality you do not want to control threads in this way. I think a thread pool shines when it's viewed like a "team", a computational resource, that is always available, "at the ready".

    However if you want to interrupt tasks (not threads) you can.

    Do not go with implementation details (detail::thread_data_base*, or for that matter the idea that it will work with asio's thread pool; you don't know the locking required or undocumented semantics of the data members; it may be you need get_or_make_current_thread_data or make_external_thread_data etc.)

    Instead go with thread_pool's documented interface: attach

    See it Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    #include <iomanip>
    #include <iostream>
    #include <list>
    using boost::chrono::seconds;
    using boost::this_thread::sleep_for; // interruptable
    
    static auto const now   = std::chrono::steady_clock::now;
    static auto const start = now();
    auto trace(auto const&... msg) {
        using namespace std::chrono_literals;
        static constexpr std::hash<std::thread::id> hash{};
        static std::mutex                           mx;
    
        std::lock_guard lk(mx);
        std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
                  << (now() - start) / 1.0s << "ms " //
                  << std::hex << std::showbase << std::setw(2) << std::setfill('0')
                  << hash(std::this_thread::get_id()) % 256 << std::dec << " ";
        (std::cout << ... << msg) << std::endl;
    }
    
    void long_calculation() {
        try {
            trace("start long_calculation");
            sleep_for(seconds(5));
            trace("complete long_calculation");
        } catch (boost::thread_interrupted) {
            trace("interrupted long_calculation");
        }
    }
    
    int main() {
        trace("Start");
        boost::asio::thread_pool tp(0);
        std::list<boost::thread> threads;
    
        for (int i = 0; i < 4; ++i)
            threads.emplace_back([&] { tp.attach(); });
    
        post(tp, long_calculation);
            post(tp, long_calculation);
    
        sleep_for(seconds(2));
    
        trace("Interrupt");
        for (auto& th : threads)
            th.interrupt();
    
        tp.join(); // in case any asio native threads
        trace("Waiting...");
        tp.stop();
        for (auto& th : threads)
            if (th.joinable())
                th.join();
    
        trace("Bye");
    }
    

    Printing

       0.000ms 0xac Start
       0.002ms 0x9b start long_calculation
       0.002ms 0x8e start long_calculation
       2.002ms 0xac Interrupt
       2.002ms 0xac Waiting...
       2.003ms 0x9b interrupted long_calculation
       2.004ms 0x8e interrupted long_calculation
       2.004ms 0xac Bye
    

    Recommendation

    I still don't believe in thread interruption - I've heard only bad things and they lead to considerable size bloat in all Boost Thread internals (see e.g.).

    Besides, I see some non-trivial issues with controlled shutdown even with the attach approach just shown.

    I'd say using an atomic flag or some weak_ptr in your code probably allows you to interrupt your calculations. If you need to maintain support for interruption points, I'd really recommend going without Asio's thread_pool which really doesn't add value at that point:

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    #include <iomanip>
    #include <iostream>
    using boost::chrono::seconds;
    using boost::this_thread::sleep_for; // interruptable
    
    static auto const now   = std::chrono::steady_clock::now;
    static auto const start = now();
    auto trace(auto const&... msg) {
        using namespace std::chrono_literals;
        static constexpr std::hash<std::thread::id> hash{};
        static std::mutex                           mx;
    
        std::lock_guard lk(mx);
        std::cout << std::fixed << std::setprecision(3) << std::setfill(' ') << std::setw(8)
                  << (now() - start) / 1.0s << "ms " //
                  << std::hex << std::showbase << std::setw(2) << std::setfill('0')
                  << hash(std::this_thread::get_id()) % 256 << std::dec << " ";
        (std::cout << ... << msg) << std::endl;
    }
    
    void long_calculation() {
        try {
            trace("start long_calculation");
            sleep_for(seconds(5));
            trace("complete long_calculation");
        } catch (boost::thread_interrupted) {
            trace("interrupted long_calculation");
        }
    }
    
    struct my_pool : boost::asio::io_context {
        my_pool(unsigned nthreads) {
            while (nthreads--)
                threads_.create_thread([this] { worker(); });
        }
    
        void join() {
            work_.reset();
            threads_.join_all();
        }
    
        void interrupt() {
            threads_.interrupt_all();
        }
    
        ~my_pool() {
            join();
        }
    
      private:
        void worker() {
            // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
            for (;;) {
                try {
                    this->run();
                    break; // exited normally
                } catch (std::exception const& e) {
                    trace("pool_worker exception: ", e.what());
                } catch (...) {
                    trace("pool_worker exception: unhandled");
                }
            }
        }
    
        boost::thread_group                             threads_;
        boost::asio::executor_work_guard<executor_type> work_{get_executor()};
    };
    
    int main() {
        trace("Start");
        my_pool tp(4);
    
        for (int i = 0; i < 5; ++i)
            post(tp, long_calculation);
    
        sleep_for(seconds(2));
    
        trace("Interrupt");
        tp.interrupt();
    
        trace("Waiting...");
        tp.join();
    
        trace("Bye");
    }
    

    Prints e.g.

       0.000ms 0x18 Start
       0.000ms 0x88 start long_calculation
       0.000ms 0x88 start long_calculation
       0.000ms 0xbe start long_calculation
       0.000ms 0x68 start long_calculation
       2.000ms 0x18 Interrupt
       2.000ms 0x18 Waiting...
       2.001ms 0x68 interrupted long_calculation
       2.001ms 0x68 start long_calculation
       2.001ms 0x88 interrupted long_calculation
       2.001ms 0xbe interrupted long_calculation
       2.001ms 0x88 interrupted long_calculation
       7.001ms 0x68 complete long_calculation
       7.001ms 0x18 Bye
    

    Note how controlled shutdown works correctly even when uninterrupted tasks are pending.

    (You can do this trivially without thread_group: http://coliru.stacked-crooked.com/a/261986d2975e1da4)