Search code examples
c++multithreadingqtsignals-slots

Launching multiple threads and restarting them


I'm trying to program a system where I create x amount of worker threads. These threads will finish their work in different times. When any of them complete their work, I will examine their output and restart them again (Keeping the running number of threads around x). I will do this for a number of aribitary iterations. So, basically a controller thread will launch x amount of threads, and will restart them when they finish their work, until some number of iterations are reached.

Additional Note #1 : When I said restart, it's perfectly fine to wait until the current one exits/aborts and gets destroyed, and create a new one. It doesn't have to "restart" the same thread. I'm mostly interested in doing this in a clean async way.

Note: I'm not looking for any specific code, but some possible pseudo code and a design pattern making use of slots and signals.

I'm aware of qt threads and have worked them. I'm familiar with examples where you launch an x amount of threads and wait until all of them finish using yield, and wait. I'm looking for a clean way to achieve what I described in the first paragraph using signals and slots.


Solution

  • That's what QtConcurrent::run() or QThreadPool::start() are for. The Concurrent framework uses a thread pool internally, so they're quite equivalent: the former is a convenience wrapper for the latter. The default thread pool is best left for short-running tasks; to run long tasks, use your own thread pool. You'd pass it to QtConcurrent::run() as the first argument.

    The QThreadPool maintains a queue of work items, dispatches them to threads, and dynamically creates and destroys worker threads. It's a wonderful class that you don't have to reimplement yourself.

    If you don't have too many units of work and can furnish them all upfront, simply use QtConcurrent::run() or QThreadPool::start() to queue them all ahead of time. They can emit a signal from a helper object to notify you as each of them finishes.

    If the units of work are too expensive to create all at once, you'll have to implement a notifying work queue on top of a thread pool.

    The unit of work needs to notify the queue and its users that it has finished. This can be done e.g. by reimplementing the QRunnable as a base for a WorkUnit, forwarding the work to an abstract method, and notifying the queue when the abstract method has finished. The same approach works for QtConcurrent::run, except that instead of reimplementing QRunnable::run you implement a functor's operator()().

    The queue will emit a workUnitDone signal for every finished work unit. The user is expected to refill the queue with one item of work upon receiving the signal (or none if there's no more work).

    For convenience, the queue can request a number of initial work items, by emitting workUnitDone(nullptr). If you replenish exactly one item every time a previous one has finished, the queue will maintain the initial number of work items.

    If the items take a very short amount of time to process, you should have many more available than the number of threads, so that no threads will be idling without work. For items that mostly take long time (tens of milliseconds or more), it's sufficient to have 1.5-2 times the QThread::idealThreadCount.

    The work units added to the queue can be instances of WorkUnit, or functors.

    // https://github.com/KubaO/stackoverflown/tree/master/questions/notified-workqueue-38000605
    #include <QtCore>
    #include <type_traits>
    
    class WorkUnit;
    class WorkQueue : public QObject {
       Q_OBJECT
       friend class WorkUnit;
       QThreadPool m_pool{this};
       union alignas(64) { // keep it in its own cache line
          QAtomicInt queuedUnits{0};
          char filler[64];
       } d;
       void isDone(WorkUnit * unit) {
          auto queued = d.queuedUnits.deref();
          emit workUnitDone(unit);
          if (!queued) emit finished();
       }
    public:
       explicit WorkQueue(int initialUnits = 0) {
          if (initialUnits)
             QTimer::singleShot(0, [=]{
                for (int i = 0; i < initialUnits; ++i)
                   emit workUnitDone(nullptr);
             });
       }
       Q_SLOT void addWork(WorkUnit * unit);
       template <typename F> void addFunctor(F && functor);
       Q_SIGNAL void workUnitDone(WorkUnit *);
       Q_SIGNAL void finished();
    };
    
    class WorkUnit : public QRunnable {
       friend class WorkQueue;
       WorkQueue * m_queue { nullptr };
       void run() override {
          work();
          m_queue->isDone(this);
       }
    protected:
       virtual void work() = 0;
    };
    
    template <typename F>
    class FunctorUnit : public WorkUnit, private F {
       void work() override { (*this)(); }
    public:
       FunctorUnit(F && f) : F(std::move(f)) {}
    };
    
    void WorkQueue::addWork(WorkUnit *unit) {
       d.queuedUnits.ref();
       unit->m_queue = this;
       m_pool.start(unit);
    }
    
    template <typename F> void WorkQueue::addFunctor(F && functor) {
       addWork(new FunctorUnit<typename std::decay<F>::type>{std::forward<F>(functor)});
    }
    

    To demonstrate things, let's do 50 units of "work" of sleeping for a random time between 1us and 1s. We're passing half of the units as SleepyWork instances, and another half as lambdas.

    #include <random>
    
    struct SleepyWork : WorkUnit {
       int usecs;
       SleepyWork(int usecs) : usecs(usecs) {}
       void work() override {
          QThread::usleep(usecs);
          qDebug() << "slept" << usecs;
       }
    };
    
    int main(int argc, char ** argv) {
       QCoreApplication app{argc, argv};
       std::random_device dev;
       std::default_random_engine eng{dev()};
       std::uniform_int_distribution<int> dist{1, 1000000};
       auto rand_usecs = [&]{ return dist(eng); };
    
       int workUnits = 50;
       WorkQueue queue{2*QThread::idealThreadCount()};
       QObject::connect(&queue, &WorkQueue::workUnitDone, [&]{
          if (workUnits) {
             if (workUnits % 2) {
                auto us = dist(eng);
                queue.addFunctor([us]{
                   QThread::usleep(us);
                   qDebug() << "slept" << us;
                });
             } else
                queue.addWork(new SleepyWork{rand_usecs()});
             --workUnits;
          }
       });
       QObject::connect(&queue, &WorkQueue::finished, [&]{
          if (workUnits == 0) app.quit();
       });
    
       return app.exec();
    }
    
    #include "main.moc"
    

    This concludes the example.