Search code examples
c++linuxmultithreadingboostc++20

Boost thread crash


Can I use boost thread + atomic built with c++20 flag. I didn't find anything mentioning this possibility in boost documentation of those libraries.

I had an application that works fine with gcc 7.1 c++17 boost 1.75 but when upgrading to gcc 11.1 c++20 I got crash in boost thread

Sanitizer does not report any issue.

The program uses boost condition_variable.

Live On Compiler Explorer

#include "boost/thread/thread.hpp"
#include "boost/shared_ptr.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/utility.hpp"
#include "boost/thread/condition_variable.hpp"
#include <boost/thread/thread.hpp>
#include <thread>

#include <algorithm>
#include <cassert>
#include <atomic>


#include <vector>

class Dispatcher;

class Task
{
public:
   virtual void run() = 0;

   virtual ~Task() {};
};


class TaskPool : boost::noncopyable
{
public:
   typedef boost::shared_ptr<Task>            task_ptr_type;
   typedef boost::shared_ptr<Dispatcher>  dispatcher_ptr_type;
   typedef std::vector<dispatcher_ptr_type>         thread_pool_type;
   typedef boost::posix_time::time_duration         time_duration_type;
   typedef std::size_t                              size_type;

   TaskPool(const size_type Size);

   ~TaskPool();

   size_type maxSize() const { return max_size_; }

   size_type watermark() const { return watermark_; }

   void setInactivTm(const time_duration_type& Inactivity_Time_Out);

   const time_duration_type& getInactivTm() const { return inactivity_time_out_; }

   void execute(const task_ptr_type& Task);

private:
   typedef boost::mutex            mutex_type;
   typedef mutex_type::scoped_lock lock_type;

   mutable mutex_type mutex_;

   size_type          max_size_;
   thread_pool_type * thread_pool_;

   time_duration_type inactivity_time_out_;

   size_type          watermark_;
   size_type          invocations_;
   size_type          executions_;
   size_type          loops_;
};


class Dispatcher : boost::noncopyable
{
public:
   typedef TaskPool::task_ptr_type      task_ptr_type;
   typedef TaskPool::time_duration_type time_duration_type;
   typedef TaskPool::size_type          size_type;

   Dispatcher();

   ~Dispatcher();

   void setInactivTm(const time_duration_type& Inactivity_Time_Out);

   bool waitReady(const time_duration_type& Time_Out);

   void execute(const task_ptr_type& Task);

   void terminate();

   static time_duration_type defaultActivTm()
   {
      return boost::posix_time::milliseconds( 1 );
   }

   static time_duration_type minActivTm()
   {
      return boost::posix_time::milliseconds( 1 );
   }

private:
   typedef boost::mutex              mutex_type;
   typedef boost::condition_variable condition_variable_type;
   typedef mutex_type::scoped_lock   lock_type;

   friend class Runner;

   bool Queued_() const volatile;
   bool NotQueued_() const volatile;

   void execute_();

   boost::thread thread_;
   task_ptr_type task_;

   mutable mutex_type      mutex_;
   condition_variable_type task_busy_cond_;
   condition_variable_type task_available_cond_;

   volatile bool is_terminated_;

   time_duration_type inactivity_time_out_;

   size_type invocations_;
   size_type executions_;
   size_type thread_created_;
   size_type thread_terminated_;
};


class Runner
{
public:
   explicit Runner(Dispatcher * Disp)
    : disp_( Disp )
   { }

   void operator()()
   {
      disp_->execute_();
   }

private:
   Dispatcher * const disp_;
};


Dispatcher::Dispatcher()
 : is_terminated_( false ),
   inactivity_time_out_( defaultActivTm() ),
   invocations_( 0 ),
   executions_( 0 ),
   thread_created_( 0 ),
   thread_terminated_( 0 )
{ }


Dispatcher::~Dispatcher()
{
   terminate();
}


void Dispatcher::setInactivTm(const time_duration_type& Inactivity_Time_Out)
{
   lock_type lock( mutex_ );

   inactivity_time_out_ = Inactivity_Time_Out;
   assert( inactivity_time_out_ >= minActivTm() );
}


bool Dispatcher::waitReady(const time_duration_type& Time_Out)
{
   lock_type lock( mutex_ );

   if ( !is_terminated_ &&
        (thread_.get_id() == boost::thread::id()) )
   {
      return true;
   }
   while ( Queued_() )
   {
      if ( !task_busy_cond_.timed_wait(lock,
                                       Time_Out) )
      {
         return false;
      }
   }
   return !is_terminated_;
}


void Dispatcher::execute(const task_ptr_type& Task)
{
   lock_type lock( mutex_ );

   if ( thread_.get_id() == boost::thread::id() )
   {
      //std::cout << "new thread\n";
      thread_created_ += 1;
      thread_ = boost::thread( Runner(this) );
   }
   while ( Queued_() )
   {
      task_busy_cond_.wait(lock);
   }
   if ( !is_terminated_ )
   {
      task_ = Task;
      task_available_cond_.notify_one();
   }
   invocations_ += 1;
}


void Dispatcher::terminate()
{
   is_terminated_ = true;

   thread_.interrupt();
   thread_.join();
}


bool Dispatcher::Queued_() const volatile
{
   return const_cast<const task_ptr_type&>(task_) &&
          !is_terminated_;
}


bool Dispatcher::NotQueued_() const volatile
{
   return !const_cast<const task_ptr_type&>(task_) &&
          !is_terminated_;
}


void Dispatcher::execute_()
{
   {
      lock_type lock( mutex_ );
      is_terminated_ = false;
   }

   while ( 1 )
   {
      task_ptr_type tmp_task;

      // Critical section.
      //
      {
         lock_type lock( mutex_ );

         while ( NotQueued_() )
         {
            if ( !task_available_cond_.timed_wait(lock,
                                                  inactivity_time_out_) )
            {
               thread_terminated_ += 1;
               thread_.detach();
               return;
            }
         }
         if ( is_terminated_ )
         {
            thread_terminated_ += 1;
            return;
         }
         tmp_task.swap( task_ );
         task_busy_cond_.notify_one();
      }
      // Execute task.
      //
      executions_ += 1;

      try
      {
         ////std::cout << "execution in progress\n";
         tmp_task->run();
         ////std::cout << "execution done\n";
      }
      catch (const boost::thread_interrupted&)
      {
         thread_terminated_ += 1;
         thread_.detach();
         return;
      }
      catch (...)
      {
         // Unexpected exception, ignore...
      }
   }
}


TaskPool::TaskPool(const size_type Size)
 : max_size_( Size ),
   thread_pool_( 0 ),
   inactivity_time_out_( Dispatcher::defaultActivTm() ),
   watermark_( 0 ),
   invocations_( 0 ),
   executions_( 0 ),
   loops_( 0 )
{
   assert( max_size_ > 0 );
   thread_pool_ = new thread_pool_type( max_size_ );
}


TaskPool::~TaskPool()
{
   delete thread_pool_;
}


void TaskPool::setInactivTm(const time_duration_type& Inactivity_Time_Out)
{
   lock_type lock( mutex_ );

   inactivity_time_out_ = Inactivity_Time_Out;
   assert( inactivity_time_out_ >= Dispatcher::minActivTm() );

   for (thread_pool_type::iterator iter = thread_pool_->begin();
        thread_pool_->end() != iter;
        ++iter)
   {
      dispatcher_ptr_type& p( *iter );

      if ( p )
      {
         p->setInactivTm( inactivity_time_out_ );
      }
   }
}


void TaskPool::execute(const task_ptr_type& Task)
{
   lock_type lock( mutex_ );

   invocations_ += 1;

   const time_duration_type min_iteration_timeout( boost::posix_time::microsec( 100 ) );
   const time_duration_type max_iteration_timeout( boost::posix_time::microsec( 100000 ) );

   time_duration_type timeout( 1 == max_size_ ? time_duration_type( boost::posix_time::pos_infin )
                                              : time_duration_type( boost::posix_time::microsec(0) ) );

   while ( 1 )
   {
      for (thread_pool_type::iterator iter = thread_pool_->begin();
           thread_pool_->end() != iter;
           ++iter)
      {
         dispatcher_ptr_type& p( *iter );

         loops_ += 1;

         if ( !p )
         {
            //std::cout << "new Dispatcher instance\n";
            p.reset( new Dispatcher );
            p->setInactivTm( inactivity_time_out_ );

            watermark_ = iter - thread_pool_->begin();
         }
         if ( p->waitReady( timeout ) )
         {
            p->execute( Task );
            executions_ += 1;
            return;
         }
      }
      if ( timeout != boost::posix_time::pos_infin )
      {
         timeout *= 2;

         timeout = std::max(timeout,
                            min_iteration_timeout);
         timeout = std::min(timeout,
                            max_iteration_timeout);
      }
   }
}


static TaskPool threadPool = 10;

class Wrapper : public Task
{
public:
   Wrapper()
   {
      listener = new Listener;
   }

   virtual void run()
   {
      boost::this_thread::sleep( boost::posix_time::seconds(10) );
      listener->run();
   }

   struct Listener
   {
      std::string s;
      void run()
      {
         s = "Hello";
         //std::cout << s << '\n';
      }
   };

   Listener* listener;
};

struct Executer
{
   std::vector<std::thread> threads;

   void dispatch()
   {
      //std::cout << "dispatch\n";
      for (auto i=0; i<2; ++i)
      {
         threads.push_back(std::move(std::thread([&]()
         {
            int index = 0;
            while (true)
            {
               {
                  //std::cout << "begin\n";
                  boost::shared_ptr<Wrapper> task( new Wrapper );
                  threadPool.execute( task );
                  //std::cout << "end\n";
               }

               if (index % 1000 == 0) boost::this_thread::sleep( boost::posix_time::seconds(5) );
            }
         })));
      }
   }

   ~Executer()
   {
      for (auto i=0; i<2; ++i) threads[i].join();
   }
};

int main()
{
  std::thread t1([](){Executer a; a.dispatch();});
  std::thread t2([](){Executer a; a.dispatch();});
  t1.join();
  t2.join();
}

The shared ptr has use_count_ = 1 weak_count_ = 1. I do not know why weak count is not 0.

Any help how to find the root cause ?


Solution

  • We discussed this likely being down to infinite recursion with Boost Operators in C++20. Then I remarked:

    I'm very surprised about the code in combination with the compiler being c++20... The code smells "legacy" - almost Java-esque - and is just rife with raw pointer abuse and boost back-ports for stuff that has been standardized in c++11? here

    and

    Would you like to see a standard-library only version of your code? That way you can completely forget about Boost compatibility issues. here

    Here is that version: Coliru in 243 lines of code. That's 210 lines fewer than the original, and with fewer smells¹ and without Boost :)

    Note I changed the Task::run interface to take a std::stop_token, because the original code used Boost's non-standard thread-interruption. If you want to emulate the old behavior, you might add throw boost::thread_interrupted from inside the two interruptible_XXX helpers. Of course you would have to handle them at the top-level in your thread as well.

    If the interrupt was only ever used to shutdown the Dispatcher loop, not intended to actually interact with user-supplied Task implementations, then simply remove the stop_token argument :)

    Adding some fancy tracing and limiting run-length (#define SHORT_DEMO), we get

    Live On Coliru

    #include <algorithm>
    #include <atomic>
    #include <cassert>
    #include <condition_variable>
    #include <memory>
    #include <thread>
    #include <utility>
    #include <vector>
    
    #include <iomanip>
    #include <iostream>
    using namespace std::chrono_literals;
    
    namespace { // diagnostics tracing helpers
        auto now = std::chrono::high_resolution_clock::now;
    
        static auto timestamp() {
            static auto start = now();
            return (now() - start) / 1.ms;
        }
    
        static std::atomic_int tid_gen = 0;
        thread_local int       tid     = tid_gen++;
        std::mutex             console_mx;
    
        void trace(auto const&... args) {
            std::lock_guard lk(console_mx);
            std::cout << "T:" << std::setw(2) << tid << std::right << std::setw(10) << timestamp() << "ms ";
            (std::cout << ... << args) << std::endl;
        }
    
        template <typename> struct CtxTracer {
            const std::string_view _ctx;
            int const              id = [] {
                static std::atomic_int idgen = 0;
                return ++idgen;
            }();
            void operator()(auto const&... args) const { ::trace(_ctx, " #", id, "\t", args...); }
        };
    
    #define TRACE_CTX(T) CtxTracer<struct Tag ## T> trace{#T};
    
    } // namespace
    
    namespace {
        // helpers to help replace boost::thread_interrupted with std::stop_token
        template <typename Lockable, typename Duration, typename Predicate>
        bool interruptible_wait_for(std::condition_variable& cv, std::unique_lock<Lockable>& lock,
                                    Duration const& dur, std::stop_token stoken, Predicate pred) {
    
            // see https://stackoverflow.com/a/66309629/85371
            std::stop_callback callback(stoken, [&cv, initial = true, &mx = *lock.mutex()]() mutable {
                if (std::exchange(initial, false)) // while constructing the callback
                    return;                        // avoid dead-lock
                mx.lock();
                mx.unlock();
                cv.notify_all();
            });
    
            cv.wait_for(lock, dur, [&] { return stoken.stop_requested() || pred(); });
            return pred();
        }
    
        template <typename Duration> // returns true if stop requested
        static bool interruptible_sleep_for(Duration const& dur, std::stop_token stoken) {
            std::mutex       mutex_;
            std::unique_lock lk{mutex_};
    #if 1
            std::condition_variable cv;
            interruptible_wait_for(cv, lk, dur, stoken, std::false_type{});
    #else
            // cleaner, but trips up threadsan in many versions
            std::condition_variable_any cv;
            cv.wait_for(lk, stoken, dur, std::false_type{});
    #endif
            return stoken.stop_requested();
        }
    } // namespace
    
    struct Task {
        virtual ~Task()                   = default;
        virtual void run(std::stop_token) = 0;
    };
    
    using mutex_type    = std::mutex;
    using cond_var_type = std::condition_variable;
    using lock_type     = std::unique_lock<mutex_type>;
    using duration_type = std::chrono::steady_clock::duration;
    using task_ptr_type = std::shared_ptr<Task>;
    
    /*
     * Conceptually a single thread that services a queue of tasks, until no task is available for a given idle timeout.
     * The queue depth is 1. That is, at most one task can be queued while at most one task is running on the thread.
     * The idle timeout can be modified during execution
     */
    class Dispatcher {
        TRACE_CTX(Dispatcher)
        Dispatcher(Dispatcher const&)            = delete;
        Dispatcher& operator=(Dispatcher const&) = delete;
    
      public:
        Dispatcher(duration_type t = default_idle_tm) : idle_timeout_(t) {}
    
        void idle_timeout(duration_type t) { idle_timeout_ = min(min_idle_tm, t); }
    
        // fails if queue slot taken and thread busy > timeout
        bool enqueue(duration_type timeout, task_ptr_type Task);
    
        static constexpr duration_type default_idle_tm = 1ms;
        static constexpr duration_type min_idle_tm     = 1ms;
    
      private:
        task_ptr_type pop(duration_type timeout) noexcept;
        void          worker_impl(std::stop_token stoken) noexcept;
    
        //////
        mutable mutex_type mutex_;
        cond_var_type      producers_, consumer_; // SEHE combine and `notify_all`?
        task_ptr_type      queued_;
        std::jthread       worker_; // the consumer thread
    
        //////
        std::atomic<duration_type> idle_timeout_;
        struct { std::atomic<size_t> queued, executed, created, terminated; } disp_stats;
    };
    
    bool Dispatcher::enqueue(duration_type timeout, task_ptr_type aTask) {
        lock_type lock(mutex_);
    
        if (!worker_.joinable()) {
            trace("new thread");
            disp_stats.created += 1;
            worker_ = std::jthread([this](std::stop_token stoken) { worker_impl(stoken); });
        }
    
        if (interruptible_wait_for(producers_, lock, timeout, worker_.get_stop_token(),
                                   [this] { return !queued_; })) {
            queued_.swap(aTask);
            consumer_.notify_one();
            disp_stats.queued += 1;
            return true;
        } else {
            return false;
        }
    }
    
    task_ptr_type Dispatcher::pop(duration_type timeout) noexcept {
        task_ptr_type task;
    
        lock_type lock(mutex_);
        if (interruptible_wait_for(consumer_, lock, timeout, worker_.get_stop_token(), [this] { return !!queued_; })) {
            task.swap(queued_);
            producers_.notify_one();
        }
        return task;
    }
    
    void Dispatcher::worker_impl(std::stop_token stoken) noexcept {
        duration_type cur_timeout;
        while (auto task = pop((cur_timeout = idle_timeout_))) {
            try {
                disp_stats.executed += 1;
                task->run(stoken);
            } catch (...) {
                trace("unhandled exception ignored");
            }
        }
    
        disp_stats.terminated += 1;
        trace("stopped idle thread (after ", cur_timeout / 1ms, "ms)");
    }
    
    class TaskPool {
        TRACE_CTX(TaskPool)
        TaskPool(TaskPool const&)            = delete; // noncopyable
        TaskPool& operator=(TaskPool const&) = delete; // noncopyable
    
      public:
        using dispatcher_t  = std::shared_ptr<Dispatcher>;
        using dispatchers_t = std::vector<dispatcher_t>;
    
        TaskPool(size_t capacity);
    
        size_t        maxSize() const;
        size_t        watermark() const { return tp_stats.watermark; }
        duration_type idle_timeout() const { return idle_timeout_; }
        void          idle_timeout(duration_type t);
    
        void execute(task_ptr_type const& Task);
    
      private:
        mutable mutex_type mutex_;
        dispatchers_t      dispatchers_;
        duration_type      peak_backoff_;
    
        std::atomic<duration_type> idle_timeout_ = Dispatcher::default_idle_tm;
        struct { std::atomic<size_t> watermark, invocations, executions, scans; } tp_stats;
    };
    
    TaskPool::TaskPool(size_t capacity) : dispatchers_(capacity) { assert(capacity); }
    
    void TaskPool::idle_timeout(duration_type t) {
        assert(t >= Dispatcher::min_idle_tm);
        idle_timeout_ = t;
    
        for (dispatcher_t const& p : dispatchers_)
            if (p)
                p->idle_timeout(t);
    }
    
    void TaskPool::execute(task_ptr_type const& Task) {
        lock_type lock(mutex_);
    
        bool const single = dispatchers_.size() == 1;
        tp_stats.invocations += 1;
    
        constexpr duration_type min = 100ms, max = 100s;
        for (duration_type w = !single ? 0s : 100s; /*true*/; w = clamp(w * 2, min, max)) {
            if (w > peak_backoff_) {
                trace("new peak backoff interval ", w / 1.0s);
                peak_backoff_ = w;
            }
    
            for (dispatcher_t& p : dispatchers_) {
                tp_stats.scans += 1;
    
                if (!p) {
                    p = std::make_shared<Dispatcher>(idle_timeout_);
                    tp_stats.watermark = &p - dispatchers_.data();
                    trace("new Dispatcher (watermark ", tp_stats.watermark, ")");
                }
    
                if (p->enqueue(w, Task)) {
                    tp_stats.executions += 1;
                    return;
                }
            }
        }
    }
    
    size_t TaskPool::maxSize() const {
        lock_type lock(mutex_);
        return dispatchers_.size();
    }
    
    struct Wrapper : Task {
        virtual void run(std::stop_token stoken) override {
            if (!interruptible_sleep_for(10s, stoken))
                listener.run();
        }
    
        struct Listener {
            TRACE_CTX(Listener)
            void run() { trace("Hello"); }
        };
    
        Listener listener;
    };
    
    static void Demo(TaskPool& pool) {
        TRACE_CTX(Demo)
    
        std::stop_source stop;
    
        // emulated application logic that produces tasks
        auto app_logic = [&pool, stoken = stop.get_token()] {
            TRACE_CTX(app_logic)
            for (unsigned index = 0; !stoken.stop_requested(); ++index) {
                auto s = now();
                pool.execute(std::make_shared<Wrapper>());
                trace("index:", index, " enqueued in ", (now() - s) / 1.s, "s");
    
                if (index % 20 == 0) {
                    trace("taking a break from producing tasks");
                    std::this_thread::sleep_for(5s);
                }
            }
            trace("exit app_logic");
        };
    
        trace("start");
        std::vector<std::thread> threads;
        threads.emplace_back(app_logic);
        threads.emplace_back(app_logic);
    
    #ifdef SHORT_DEMO
        std::this_thread::sleep_for(10s); // (2.5min);
        trace("Requesting shutdown for SHORT_DEMO");
        stop.request_stop();
    #endif
    
        trace("joining app_logic threads");
        for (auto& th : threads)
            th.join();
        trace("joined app_logic threads");
    }
    
    int main() {
        TRACE_CTX(Main);
    
        std::cout << std::setprecision(2) << std::fixed;
        trace("main");
    
        {
            TaskPool threadPool{10};
    
            std::thread t1(Demo, std::ref(threadPool));
            std::thread t2(Demo, std::ref(threadPool));
    
            trace("joining t1..."); t1.join();
            trace("joining t2..."); t2.join();
            trace("awaiting task pool");
        }
    
        trace("bye");
    }
    

    With output like

    g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp -DSHORT_DEMO
    ./a.out
    T: 0      0.00ms Main #1    main
    T: 0      0.17ms Main #1    joining t1...
    T: 1      0.22ms Demo #1    start
    T: 2      0.27ms Demo #2    start
    T: 3      0.48ms TaskPool #1    new Dispatcher (watermark 0)
    T: 3      0.50ms Dispatcher #1  new thread
    T: 3      0.67ms app_logic #1   index:0 enqueued in 0.00s
    T: 3      0.69ms app_logic #1   taking a break from producing tasks
    T: 4      0.72ms app_logic #2   index:0 enqueued in 0.00s
    T: 4      0.73ms app_logic #2   taking a break from producing tasks
    T: 5      0.88ms TaskPool #1    new Dispatcher (watermark 1)
    T: 5      0.90ms Dispatcher #2  new thread
    T: 5      0.97ms app_logic #3   index:0 enqueued in 0.00s
    T: 5      0.99ms app_logic #3   taking a break from producing tasks
    T: 6      1.17ms app_logic #4   index:0 enqueued in 0.00s
    T: 6      1.19ms app_logic #4   taking a break from producing tasks
    T: 4   5001.26ms TaskPool #1    new Dispatcher (watermark 2)
    T: 4   5001.33ms Dispatcher #3  new thread
    T: 4   5001.47ms app_logic #2   index:1 enqueued in 0.00s
    T: 3   5001.83ms app_logic #1   index:1 enqueued in 0.00s
    T: 5   5002.37ms TaskPool #1    new Dispatcher (watermark 3)
    T: 5   5002.42ms Dispatcher #4  new thread
    T: 5   5002.54ms app_logic #3   index:1 enqueued in 0.00s
    T: 5   5003.07ms app_logic #3   index:2 enqueued in 0.00s
    T: 4   5003.76ms TaskPool #1    new Dispatcher (watermark 4)
    T: 4   5003.77ms Dispatcher #5  new thread
    T: 4   5003.84ms app_logic #2   index:2 enqueued in 0.00s
    T: 3   5004.55ms app_logic #1   index:2 enqueued in 0.00s
    T: 6   5005.41ms TaskPool #1    new Dispatcher (watermark 5)
    T: 6   5005.43ms Dispatcher #6  new thread
    T: 6   5005.51ms app_logic #4   index:1 enqueued in 0.00s
    T: 6   5006.37ms app_logic #4   index:2 enqueued in 0.00s
    T: 4   5007.44ms TaskPool #1    new Dispatcher (watermark 6)
    T: 4   5007.46ms Dispatcher #7  new thread
    T: 4   5007.56ms app_logic #2   index:3 enqueued in 0.00s
    T: 3   5008.58ms app_logic #1   index:3 enqueued in 0.00s
    T: 5   5009.75ms TaskPool #1    new Dispatcher (watermark 7)
    T: 5   5009.77ms Dispatcher #8  new thread
    T: 5   5009.86ms app_logic #3   index:3 enqueued in 0.01s
    T: 6   5011.04ms app_logic #4   index:3 enqueued in 0.00s
    T: 4   5012.41ms TaskPool #1    new Dispatcher (watermark 8)
    T: 4   5012.43ms Dispatcher #9  new thread
    T: 4   5012.51ms app_logic #2   index:4 enqueued in 0.00s
    T: 3   5013.85ms app_logic #1   index:4 enqueued in 0.01s
    T: 5   5015.36ms TaskPool #1    new Dispatcher (watermark 9)
    T: 5   5015.38ms Dispatcher #10 new thread
    T: 5   5015.46ms app_logic #3   index:4 enqueued in 0.01s
    T: 6   5016.97ms app_logic #4   index:4 enqueued in 0.01s
    T: 6   5018.64ms TaskPool #1    new peak backoff interval 0.10
    T: 6   6020.28ms TaskPool #1    new peak backoff interval 0.20
    T: 6   8022.03ms TaskPool #1    new peak backoff interval 0.40
    T: 1  10000.67ms Demo #1    Requesting shutdown for SHORT_DEMO
    T: 1  10000.76ms Demo #1    joining app_logic threads
    T: 2  10000.81ms Demo #2    Requesting shutdown for SHORT_DEMO
    T: 2  10000.84ms Demo #2    joining app_logic threads
    T: 7  10000.87ms Listener #1    Hello
    T: 8  10001.11ms Listener #3    Hello
    T: 6  12023.81ms TaskPool #1    new peak backoff interval 0.80
    T: 6  12023.89ms app_logic #4   index:5 enqueued in 7.01s
    T: 6  12023.91ms app_logic #4   exit app_logic
    T: 3  12024.14ms app_logic #1   index:5 enqueued in 7.01s
    T: 3  12024.19ms app_logic #1   exit app_logic
    T: 9  15001.65ms Listener #6    Hello
    T:10  15002.69ms Listener #7    Hello
    T:11  15015.13ms Listener #9    Hello
    T:12  15015.17ms Listener #8    Hello
    T:13  15015.24ms Listener #13   Hello
    T:14  15015.29ms Listener #12   Hello
    T:15  15015.33ms Listener #17   Hello
    T:16  15015.59ms Listener #19   Hello
    T: 5  15015.65ms app_logic #3   index:5 enqueued in 10.00s
    T: 5  15015.67ms app_logic #3   exit app_logic
    T: 1  15015.73ms Demo #1    joined app_logic threads
    T: 0  15015.80ms Main #1    joining t2...
    T: 4  15016.00ms app_logic #2   index:5 enqueued in 10.00s
    T: 4  15016.02ms app_logic #2   exit app_logic
    T: 2  15016.11ms Demo #2    joined app_logic threads
    T: 0  15016.20ms Main #1    awaiting task pool
    T: 7  20001.13ms Dispatcher #1  stopped idle thread (after 1ms)
    T: 8  20001.31ms Listener #4    Hello
    T: 8  20013.48ms Dispatcher #2  stopped idle thread (after 1ms)
    T: 9  25001.90ms Dispatcher #3  stopped idle thread (after 1ms)
    T:10  25015.25ms Dispatcher #4  stopped idle thread (after 1ms)
    T:11  25017.66ms Listener #10   Hello
    T:12  25017.71ms Listener #15   Hello
    T:13  25017.76ms Listener #14   Hello
    T:14  25017.79ms Listener #16   Hello
    T:15  25017.84ms Listener #18   Hello
    T:16  25017.89ms Listener #20   Hello
    T:11  25018.81ms Dispatcher #5  stopped idle thread (after 1ms)
    T:13  25018.84ms Dispatcher #7  stopped idle thread (after 1ms)
    T:12  25018.88ms Dispatcher #6  stopped idle thread (after 1ms)
    T:14  25018.94ms Dispatcher #8  stopped idle thread (after 1ms)
    T:15  25019.06ms Dispatcher #9  stopped idle thread (after 1ms)
    T:16  35018.10ms Dispatcher #10 stopped idle thread (after 1ms)
    T: 0  35018.30ms Main #1    bye
    

    Design Questions

    I see a number of issues with the design even after the improvements

    • TaskPool is fixed-capacity task-queue and thread-pool combined, where each "dispatcher" has 0-2 tasks: 0 or 1 currently executing, and 0 or 1 queued_

    • There is no work stealing, each dispatcher has at most one queued slot, regardless of how many others are pending elsewhere

    • Enqueuing is the bottleneck. Worst case it blocks indefinitely. There's a back-off time leading for the time for a single dispatcher enqueue to be 100s at a time. Under the TaskPool mutex. At this point all the operation becomes effectively single threaded.

    • Dispatcher scan is always commenced from the top, which is unlikely to be optimal. Consider 2 clients attempting to post a task (TaskPool::execute) at the same time.

      Due to the mutex, they will be fulfilled in sequence. The first will scan the full dispatchers_ list and find the soonest available slot.

      Even assuming "nice" circumstances where only one inner loop is needed (w == 0s), the other client's task will be placed immediately after, meaning that the first slots that had just been tried (and rejected because busy) are being tried again.

      Basically, TaskPool is like a control freak, insisting they're the only one who can manage tasks, but doing a poor job, and getting tired really quickly so they take increasingly long breaks.

    • In the end, the TaskPool is fixed-capacity, like 10 threads, but for some reason it is deemed "beneficial" to terminate threads when they aren't Very Busy. In effect you get strictly more overhead by having to create/terminate threads.

      By contrast, on most operating systems, threads awaiting synchronization primitives do not impose any runtime cost. Just look at the process/thread list of any running system. Right now, my system is running 1850 LWP ("light weight processes"). If they weren't effectively scheduled, that wouldn't work at all.

    • To add insult to injury, the dispatcher table is not ordered in any way, meaning that if the element would gracefully terminate because it was idle, it would immediately be recreated on the next invocation, regardless of how many idle dispatchers were sitting in the list.

    • In the end the TaskPool ends up violating FIFO expectations

    Given all this, I'm struggling in what scenario this approach could be better in any respect than a more classical task-queue shared with identical worker threads (which never terminate because they don't consume resources when idle anyways):

    • Unless at capacity, queuing would NEVER wait.
    • Even when at capacity, waiting would be optimal, because instead of arbitrarily picking a dispatcher and waiting for increasingly long for that particular thread to maybe become available (for... queuing, not even executing), you can now just block exactly until any thread dequeued a task.
    • At the same time, queue capacity is no longer artificially bound to the number of worker threads

    Alternative Design

    This is the alternative design, dimensioned to the same capacity:

    live On Coliru

    #define SHORT_DEMO
    #include <atomic>
    #include <cassert>
    #include <condition_variable>
    #include <deque>
    #include <iomanip>
    #include <iostream>
    #include <thread>
    #include <utility>
    using namespace std::chrono_literals;
    
    namespace { // diagnostics tracing helpers
        auto now = std::chrono::high_resolution_clock::now;
        static auto timestamp() {
            static auto start = now();
            return (now() - start) / 1.ms;
        }
    
        static std::atomic_int tid_gen = 0;
        thread_local int       tid     = tid_gen++;
        std::mutex             console_mx;
    
        void trace(auto const&... args) {
            std::lock_guard lk(console_mx);
            std::cout << "T:" << std::setw(2) << tid << std::right << std::setw(10) << timestamp() << "ms ";
            (std::cout << ... << args) << std::endl;
        }
    
        template <typename> struct CtxTracer {
            const std::string_view _ctx;
            int const              id = [] {
                static std::atomic_int idgen = 0;
                return ++idgen;
            }();
            void operator()(auto const&... args) const { ::trace(_ctx, " #", id, "\t", args...); }
        };
    
        #define TRACE_CTX(T) CtxTracer<struct Tag ## T> trace{#T};
    } // namespace
    
    namespace {
        // helpers to help replace boost::thread_interrupted with std::stop_token
        template <typename Lockable, typename Duration, typename Predicate>
        bool interruptible_wait_for(std::condition_variable& cv, std::unique_lock<Lockable>& lock,
                                    Duration const& dur, std::stop_token stoken, Predicate pred) {
    
            // see https://stackoverflow.com/a/66309629/85371
            std::stop_callback callback(stoken, [&cv, initial = true, &mx = *lock.mutex()]() mutable {
                if (std::exchange(initial, false)) // while constructing the callback
                    return;                        // avoid dead-lock
                mx.lock();
                mx.unlock();
                cv.notify_all();
            });
    
            cv.wait_for(lock, dur, [&] { return stoken.stop_requested() || pred(); });
            return pred();
        }
    
        template <typename Duration> // returns true if stop requested
        static bool interruptible_sleep_for(Duration const& dur, std::stop_token stoken) {
            std::mutex              mutex_;
            std::unique_lock lk{mutex_};
            std::condition_variable cv;
            interruptible_wait_for(cv, lk, dur, stoken, std::false_type{});
    
            return stoken.stop_requested();
        }
    } // namespace
    
    struct Task {
        virtual ~Task()                   = default;
        virtual void run(std::stop_token) = 0;
    };
    class TaskPool {
        TRACE_CTX(TaskPool)
        static constexpr std::chrono::steady_clock::duration externity = 999'999h; // duration::max() gives overflows in some implementations
    
      public:
        using task_ptr = std::shared_ptr<Task>;
    
        TaskPool(size_t capacity);
        ~TaskPool() noexcept;
    
        size_t maxSize()   const { return capacity_;  }
        size_t watermark() const { return watermark_; }
    
        void execute(task_ptr Task);
    
      private:
        mutable std::mutex      mutex_;
        std::condition_variable producers_, consumers_; // SEHE combine and `notify_all`?
    
        size_t const            capacity_;
        std::stop_source        stop_source_;
        std::deque<std::thread> workers_; // workers
        std::deque<task_ptr>    queue_;
    
        // former Dispatcher implementation
        task_ptr pop() noexcept;
        void     worker_impl(std::stop_token stoken) noexcept;
        size_t watermark_ = 0, invocations_ = 0, executed_ = 0;
    };
    
    TaskPool::TaskPool(size_t capacity) : capacity_(capacity) {
        assert(capacity);
        while (capacity--) // assuming same number of workers as queue capacity, for comparability with old design
            workers_.emplace_back(&TaskPool::worker_impl, this, stop_source_.get_token());
    }
    
    TaskPool::~TaskPool() noexcept {
        stop_source_.request_stop();
        for (auto& w : workers_)
            if (w.joinable())
                w.join();
    }
    
    void TaskPool::execute(task_ptr task) {
        std::unique_lock lock(mutex_);
        if (interruptible_wait_for(producers_, lock, externity, stop_source_.get_token(),
                                     [this] { return queue_.size() < capacity_; })) {
            queue_.push_back(std::move(task));
            consumers_.notify_one();
    
            invocations_ += 1;
            watermark_ = std::max(watermark_, queue_.size());
        } // else: stop was requested
    }
    
    TaskPool::task_ptr TaskPool::pop() noexcept {
        task_ptr task;
        std::unique_lock lock(mutex_);
        if (interruptible_wait_for(consumers_, lock, externity, stop_source_.get_token(),
                                   [this] { return !queue_.empty(); })) {
            task.swap(queue_.front());
            queue_.pop_front();
            producers_.notify_one();
        }
        return task;
    }
    
    void TaskPool::worker_impl(std::stop_token stoken) noexcept {
        while (auto task = pop())
            try {
                executed_ += 1;
                task->run(stoken);
            } catch (...) { trace("unhandled exception ignored"); }
        trace("worker exit");
    }
    
    struct Wrapper : Task {
        virtual void run(std::stop_token stoken) override {
            if (!interruptible_sleep_for(10s, stoken))
                listener.run();
        }
    
        struct Listener {
            TRACE_CTX(Listener)
            void run() { trace("Hello"); }
        };
        Listener listener;
    };
    
    static void Demo(TaskPool& pool) {
        TRACE_CTX(Demo)
        std::stop_source stop;
    
        // emulated application logic that produces tasks
        auto app_logic = [&pool, stoken = stop.get_token()] {
            TRACE_CTX(app_logic)
            for (unsigned index = 0; !stoken.stop_requested(); ++index) {
                auto s = now();
                pool.execute(std::make_shared<Wrapper>());
                trace("index:", index, " enqueued in ", (now() - s) / 1.s, "s");
    
                if (index % 20 == 0) {
                    trace("taking a break from producing tasks");
                    std::this_thread::sleep_for(5s);
                }
            }
            trace("exit app_logic");
        };
    
        trace("start");
        std::deque<std::thread> threads;
        threads.emplace_back(app_logic);
        threads.emplace_back(app_logic);
    
    #ifdef SHORT_DEMO
        std::this_thread::sleep_for(10s); // (2.5min);
        trace("Requesting shutdown for SHORT_DEMO");
        stop.request_stop();
    #endif
    
        trace("joining app_logic threads");
        for (auto& th : threads)
            th.join();
        trace("joined app_logic threads");
    }
    
    int main() {
        TRACE_CTX(Main);
    
        std::cout << std::setprecision(2) << std::fixed;
        trace("main");
    
        {
            TaskPool threadPool{10};
    
            std::thread t1(Demo, std::ref(threadPool));
            std::thread t2(Demo, std::ref(threadPool));
    
            trace("joining t1..."); t1.join();
            trace("joining t2..."); t2.join();
            trace("awaiting task pool");
        }
    
        trace("bye");
    }
    

    Note that is completes a full 10s earlier, despite generating the same amount of work with the same spacing, and having identical number of workers and queue capacity. We lost an entire type (Dispatcher) and a lot of complexity.

    Conclusion / Summary

    I may have suffered from a lack of imagination when thinking of loads that benefit from the specific queuing semantics exhibited by the original design. However, I listed a fair number of objective problems. Also, if the design was intentional, I feel there was at least a lack of clear naming and (self)documentation.

    Regardless I hope the two approaches help you along. Compare the behaviors and choose what's best for you.


    ¹ (too many classes not pulling their weight, conflated classes (Runner and Dispatcher are conjoined twins), unnecessary use of raw pointers, volatile and const_cast abuse...).