Search code examples
c++winapilockingthreadpool

Disappointing performance of custom thread pool


I am stuck with a certain old code base which will not compile with anything else but Borland compilers. The code does some computations that would run nicely in parallel but since the compiler does not support OpenMP, I tried to come up with my own thread bool built around WinAPI WAIT_CONDITIONs and SRWLOCKs. The pool itself seems to work fine, except that the performance is not much better than single threaded code. I find it rather difficult to believe that the locking logic would be so heavy that it would completely obliterate the benefit of parallel processing. A quick look at the Process Explorer tells me that each of my 8 (quadcore HT CPU) workers uses about 0.5 % of CPU time which makes me think that the workers are spending most of the time sleeping.

What am I missing here? And yes, I am sure that the bit I am trying to run in parallel is the hottest path.

Some relevant bits of code:

class Barrier {
public:
    Barrier(const int workers) :
            working(0),
            workers(workers)
    {
            ::InitializeSRWLock(&lock);
            ::InitializeConditionVariable(&waitForWork);
            ::InitializeConditionVariable(&workDone);
    }

    ~Barrier()
    {
    }

    void Rendezvous()
    {
            ::ReleaseSRWLockExclusive(&lock);

            ::WakeAllConditionVariable(&waitForWork);

            ::AcquireSRWLockExclusive(&lock);
            while (working > 0)
                    ::SleepConditionVariableSRW(&workDone, &lock, INFINITE, 0);
            ::ReleaseSRWLockExclusive(&lock);
    }

    volatile long working;

    SRWLOCK lock;
    CONDITION_VARIABLE waitForWork;
    CONDITION_VARIABLE workDone;

private:
    const long workers;
};

class Worker {
public:
    Worker(Barrier *_bar) :
            /* Some worker data */
            terminate(false),
            failed(false),
            hThread(NULL),
            threadId(0),
            bar(_bar)
    {
    }
    
    /* Some worker data */

    bool terminate;
    bool failed;

    HANDLE hThread;
    DWORD threadId;

    Barrier *bar;

private:
    Worker(const Worker &other)
    {
    }
};

bool WorkingBlock::Process(/* Some worker data */)
{
    ::AcquireSRWLockExclusive(&m_barrier->lock);
    for (int thr = 0; thr < int(m_NThreads); thr++) {
            Worker *wrk = m_workers->operator[](thr);
            /* Setup workers */
            PrepareWorker(wrk); /* This increments the "working" variable in barrier */
            wrk->processing = true;
    }

    /* Wait till workers finish */
    m_barrier->Rendezvous();
    
    /* Process results */
}

inline
DWORD WINAPI WorkerProc(LPVOID param)
{
    Worker *wrk = static_cast<Worker *>(param);
    
    while (true) {
        ::AcquireSRWLockShared(&wrk->bar->lock);
        while (!wrk->processing && !wrk->terminate)
            ::SleepConditionVariableSRW(&wrk->bar->waitForWork, &wrk->bar->lock,
                                        INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED);
        ::ReleaseSRWLockShared(&wrk->bar->lock);

        if (wrk->terminate) {
            return 0;
        }
    
        /* Do the calculation */

        wrk->processing = false;

        ::AcquireSRWLockExclusive(&wrk->bar->lock);
        wrk->bar->working--;
        if (wrk->bar->working == 0) {
            ::ReleaseSRWLockExclusive(&wrk->bar->lock);
            ::WakeConditionVariable(&wrk->bar->workDone);
        } else
            ::ReleaseSRWLockExclusive(&wrk->bar->lock);
    }

    return 0;
}

I spin up the worker threads beforehand and have them sleep and wait until there is a new batch of work ready. Is there some kind of synchronization clash that I am missing?

EDIT: Added usage of the processing Worker flag to the code.


Solution

  • After some more investigation it turned out that the profiler results needed a bit of extra interpretation. Additionally, there was a lot of surprisingly inefficient bits of code all over the main loop. Having taken care of the worst and parallelizing the loop in a few more places gets me a decent performance boost. With a large enough problem I can get up to ~60 % average CPU utilization on 4C/8T CPU. It's nowhere near as good as it would be with OpenMP but it is better than nothing.

    For future reference, this is the OpenMP-like thread pool for parallelizing for-loops I ended up with.

    SThreadPool.h

    #ifndef STHREADPOOL_H
    #define STHREADPOOL_H
    
    #include <stdexcept>
    #include <vector>
    
    #if (__cplusplus >= 201103L) || (_MSC_VER >= 1900)
            #define STP_HAVE_CPP11
    #endif // CPP11 check
    
    #ifdef STP_HAVE_CPP11
            #define STP_NOTHROW noexcept
    #else
            #define STP_NOTHROW throw()
    #endif // STP_HAVE_CPP11
    
    namespace stpool {
    
    class Exception : public std::runtime_error {
    public:
            explicit Exception(const char *msg);
    };
    
    enum WorkerResult {
            WR_SUCCESS,     /*!< Worker finished correctly, results are OK */
            WR_FAILURE,     /*!< Worker finished abnormally, results should be discarded */
            WR_SKIPPED,     /*!< Worker did not execute because the job size was too small */
            WR_INVALID      /*!< This state must never be returned */
    };
    
    class Barrier;
    class WorkerPrivate;
    
    class Worker {
    public:
            Worker();
    
            WorkerResult result;    /*<! Result of last worker cycle */
            WorkerPrivate *priv;    /*<! Internal worker data, do not touch!*/
    
    private:
            Worker(const Worker &other);
            Worker & operator=(const Worker &other);
    };
    
    typedef std::vector<Worker *> WorkerVec;
    typedef const std::vector<Worker *> & CWorkerVecRef;
    typedef void * Payload;
    typedef std::vector<Payload> PayloadVec;
    
    /*!
     * WorkerFunc prototype.
     *
     * @param[in] from First index of the portion of the loop
     * @param[in] to Last index of the portion of the loop
     * @param[in,out] p Data specific for the given job
     *
     * @retval true Job succeeded
     * @retval false Job failed
     */
    typedef bool (*WorkerFunc)(const int from, const int to, Payload p);
    
    /*!
     * Specifies the condition for the last element in the loop
     */
    enum TerminationPolicy {
            TPOL_INCLUSIVE, /*!< "=<" terminating condition */
            TPOL_EXCLUSIVE  /*!< "<" terminating condition */
    };
    
    /*!
     * ThreadPool object capable of executing for-loops in parallel.
     * Think of this as poor man's OpenMP...
     */
    class ThreadPool {
    public:
            /*!
             * ThreadPool c-tor
             *
             * @param[in] NThreads Number of worker threads to prepare
             */
            ThreadPool(const long NThreads);
            ~ThreadPool();
    
            /*!
             * Runs the parallel job. Payloads must be set up before this function
             * is called.
             *
             * @param[in] from First index in the loop
             * @param[in] to Last index in the loop
             * @param[in,out] payloads Vector of data specific for the given loop.
             *                          Size of the vector must be the same as the number
             *                          of worker threads.
             * @param[in] func Function that performs the actual calculation
             *
             * @return Vector of finished workers
             */
            template <TerminationPolicy Policy>
            CWorkerVecRef Process(const int from, const int to,
                                  const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW;
    
            /*!
             * Returns number of available worker threads
             *
             * @return Number of available worker threads
             */
            long Threads() const;
    
    private:
            ThreadPool(const ThreadPool &other);
            ThreadPool & operator=(const ThreadPool &other);
    
            void Cleanup();
            void PrepareWorker(Worker *wrk, const int from, const int to,
                               Payload payload, WorkerFunc func);
            void SkipWorker(Worker *wrk);
    
            WorkerVec m_workers;
            Barrier *m_barrier;
    
            const long m_NThreads;
    };
    
    /*!
     * Number of available logical CPUs
     */
    int NumOfCPUs();
    
    } // namespace stpool
    
    #endif // STHREADPOOL_H
    

    SThreadPool.cpp

    #include "SThreadPool.h"
    
    #include <Windows.h>
    
    #include <cassert>
    
    #define STP_USE_SRWLOCK 1
    #define STP_USE_SYSTEM_SRWLOCK 0
    #define STP_SPIN_FOR_RDVZ 0
    
    /* Spinning for rendezvous seems to be more efficient only
     * when there is less workers that available CPUs */
    
    #if STP_USE_SRWLOCK
            #if !STP_USE_SYSTEM_SRWLOCK
                    #define SRWLOCK_INIT RTL_SRWLOCK_INIT
                    typedef RTL_SRWLOCK SRWLOCK, *PSRWLOCK;
            #endif // USE_SYSTEM_SRWLOCK
    
            #define LOCK_PRIM SRWLOCK
            #define InitLock(lk) ::InitializeSRWLock(lk)
            #define DeleteLock(lk)
            #define AcquireLkExcl(lk) ::AcquireSRWLockExclusive(lk)
            #define AcquireLkShared(lk) ::AcquireSRWLockShared(lk)
            #define ReleaseLkExcl(lk) ::ReleaseSRWLockExclusive(lk)
            #define ReleaseLkShared(lk) ::ReleaseSRWLockShared(lk)
            #define WaitCondExcl(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, 0)
            #define WaitCondShared(wc, lk) ::SleepConditionVariableSRW(wc, lk, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED)
    #else
            #define LOCK_PRIM CRITICAL_SECTION
            #define InitLock(lk) ::InitializeCriticalSectionAndSpinCount(lk, 5000)
            #define DeleteLock(lk) ::DeleteCriticalSection(lk)
            #define AcquireLkExcl(lk) ::EnterCriticalSection(lk)
            #define AcquireLkShared(lk) ::EnterCriticalSection(lk)
            #define ReleaseLkExcl(lk) ::LeaveCriticalSection(lk)
            #define ReleaseLkShared(lk) ::LeaveCriticalSection(lk)
            #define WaitCondExcl(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
            #define WaitCondShared(wc, lk) ::SleepConditionVariableCS(wc, lk, INFINITE)
    #endif // STP_USE_SRWLOCK
    
    namespace stpool {
    
    Exception::Exception(const char *msg) :
            std::runtime_error(msg)
    {
    }
    
    class Barrier {
    public:
            Barrier(const long workers) :
                    working(0),
                    workers(workers)
            {
                    InitLock(&lock);
                    ::InitializeConditionVariable(&waitForWork);
            #if !STP_SPIN_FOR_RDVZ
                    ::InitializeConditionVariable(&workDone);
            #endif // STP_SPIN_FOR_RDVZ
            }
    
            ~Barrier()
            {
                    DeleteLock(&lock);
            }
    
            void Rendezvous();
    
            volatile long working;
    
            LOCK_PRIM lock;
            CONDITION_VARIABLE waitForWork;
    #if !STP_SPIN_FOR_RDVZ
            CONDITION_VARIABLE workDone;
    #endif // STP_SPIN_FOR_RDVZ
    
    private:
            const long workers;
    };
    
    void Barrier::Rendezvous()
    {
            ReleaseLkExcl(&lock);
    
            ::WakeAllConditionVariable(&waitForWork);
    
    #if !STP_SPIN_FOR_RDVZ
            AcquireLkExcl(&lock);
            while (working > 0)
                    WaitCondExcl(&workDone, &lock);
            ReleaseLkExcl(&lock);
    #else
            while (working > 0)
                    YieldProcessor();
    #endif // STP_SPIN_FOR_RDVZ
    }
    
    Worker::Worker() :
            result(WR_FAILURE)
    {
    }
    
    class WorkerPrivate {
    public:
            WorkerPrivate() :
                    from(-1),
                    to(-1),
                    payload(NULL),
                    func(NULL),
                    process(false),
                    terminate(false),
                    failed(false),
                    threadId(0),
                    hThread(NULL),
                    barrier(NULL)
            {
            }
    
            int from;
            int to;
            Payload payload;
            WorkerFunc func;
    
            bool process;
            bool terminate;
            bool failed;
    
            DWORD threadId;
            HANDLE hThread;
            Barrier *barrier;
    };
    
    static
    DWORD WINAPI ThreadProc(LPVOID param)
    {
            Worker *wrk = static_cast<Worker *>(param);
            WorkerPrivate *priv = wrk->priv;
    
            while (true) {
            #ifdef STP_PRN_TPTS
                    {
                    AnsiString str("Worker waiting: ");
                    str += wrk->threadId;
                    OutputDebugStringA(str.c_str());
                    }
            #endif // STP_PRN_TPTS
    
                    AcquireLkShared(&priv->barrier->lock);
                    while (!priv->process && !priv->terminate)
                            WaitCondShared(&priv->barrier->waitForWork, &priv->barrier->lock);
                    ReleaseLkShared(&priv->barrier->lock);
    
                    if (priv->terminate)
                            return 0;
    
                    assert(priv->payload != NULL);
                    assert(priv->func != NULL);
    
                    const bool ret = priv->func(priv->from, priv->to, priv->payload);
                    wrk->result = ret ? WR_SUCCESS : WR_FAILURE;
    
                    priv->process = false;
    
            #ifdef STP_PRN_TPTS
                    {
                    AnsiString str("Worker done: ");
                    str += wrk->threadId;
                    OutputDebugStringA(str.c_str());
                    }
            #endif // STP_PRN_TPTS
    
            #if !STP_SPIN_FOR_RDVZ
                    AcquireLkExcl(&priv->barrier->lock);
                    priv->barrier->working--;
                    if (priv->barrier->working == 0) {
                            ReleaseLkExcl(&priv->barrier->lock);
                            ::WakeConditionVariable(&priv->barrier->workDone);
                    } else
                            ReleaseLkExcl(&priv->barrier->lock);
            #else
                    ::InterlockedDecrement(&priv->barrier->working);
            #endif // SPIN_FOR_RDVZ
            }
    
            return 0;
    }
    
    ThreadPool::ThreadPool(const long NThreads) :
            m_NThreads(NThreads)
    {
            if (m_NThreads < 1)
                    throw Exception("Invalid argument");
    
            m_barrier = new Barrier(m_NThreads);
    
            m_workers.reserve(NThreads);
            for (int thr = 0; thr < m_NThreads ; thr++) {
                    Worker *wrk = new Worker();
                    WorkerPrivate *priv = new WorkerPrivate;
    
                    DWORD thrId;
                    HANDLE hThread = ::CreateThread(NULL, 0, ThreadProc, wrk, 0, &thrId);
                    if (hThread == NULL) {
                            delete priv;
                            delete wrk;
                            Cleanup();
    
                            throw Exception("Failed to initialize thread pool");
                    }
    
                    priv->threadId = thrId;
                    priv->hThread = hThread;
                    priv->barrier = m_barrier;
                    wrk->priv = priv;
    
                    m_workers.push_back(wrk);
            }
    }
    
    ThreadPool::~ThreadPool()
    {
            Cleanup();
    
            delete m_barrier;
    }
    
    void ThreadPool::Cleanup()
    {
            AcquireLkExcl(&m_barrier->lock);
            for (size_t idx = 0; idx < m_workers.size(); idx++)
                    m_workers[idx]->priv->terminate = true;
            ReleaseLkExcl(&m_barrier->lock);
    
            ::WakeAllConditionVariable(&m_barrier->waitForWork);
    
            for (size_t idx = 0; idx < m_workers.size(); idx++) {
                    Worker *wrk = m_workers[idx];
    
                    if (!wrk->priv->failed) {
                            ::WaitForSingleObject(wrk->priv->hThread, INFINITE);
                            ::CloseHandle(wrk->priv->hThread);
                    }
    
                    delete wrk->priv;
                    delete wrk;
            }
    }
    
    template <>
    CWorkerVecRef ThreadPool::Process<TPOL_EXCLUSIVE>(const int from, const int to,
                                                      const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
    {
            assert(to > from);
            assert(m_workers.size() == payloads.size());
    
            const int slice = int(float(to - from) / m_NThreads + 0.5F);
    
    #ifdef STP_PRN_TPTS
            OutputDebugStringA("--- Para start");
    #endif // STP_PRN_TPTS
    
            int idx = from;
    
            AcquireLkExcl(&m_barrier->lock);
            for (long thr = 0; thr < m_NThreads - 1; thr++) {
                    Worker *wrk = m_workers[thr];
                    Payload pl = payloads[thr];
    
                    if (idx > to) {
                            SkipWorker(wrk);
                            break;
                    }
    
                    int realTo = idx + slice;
                    if (realTo > to)
                            realTo = to;
    
                    PrepareWorker(wrk, idx, realTo, pl, func);
                    idx += slice;
            }
    
            Worker *wrk = m_workers.back();
            Payload pl = payloads.back();
            if (idx <= to)
                    PrepareWorker(wrk, idx, to, pl, func);
            else
                    SkipWorker(wrk);
    
            m_barrier->Rendezvous();
    
            return m_workers;
    }
    
    template <>
    CWorkerVecRef ThreadPool::Process<TPOL_INCLUSIVE>(const int from, const int to,
                                                      const PayloadVec &payloads, WorkerFunc func) STP_NOTHROW
    {
            assert(to >= from);
            assert(m_workers.size() == payloads.size());
    
            const int slice = int(float(to - from) / m_NThreads + 0.5F);
    
    #ifdef STP_PRN_TPTS
            OutputDebugStringA("--- Para start");
    #endif // STP_PRN_TPTS
    
            int idx = from;
    
            AcquireLkExcl(&m_barrier->lock);
            for (long thr = 0; thr < m_NThreads - 1; thr++) {
                    Worker *wrk = m_workers[thr];
                    Payload pl = payloads[thr];
    
                    if (idx > to) {
                            SkipWorker(wrk);
                            break;
                    }
    
                    int realTo = idx + slice;
                    if (realTo > to)
                            realTo = to;
    
                    PrepareWorker(wrk, idx, realTo, pl, func);
                    idx += slice + 1;
            }
    
            Worker *wrk = m_workers.back();
            Payload pl = payloads.back();
            if (idx <= to)
                    PrepareWorker(wrk, idx, to, pl, func);
            else
                    SkipWorker(wrk);
    
            m_barrier->Rendezvous();
    
            return m_workers;
    }
    
    void ThreadPool::PrepareWorker(Worker *wrk, const int from, const int to,
                                   Payload payload, WorkerFunc func)
    {
            WorkerPrivate *priv = wrk->priv;
    
            wrk->result = WR_INVALID;
    
            priv->from = from;
            priv->to = to;
            priv->payload = payload;
            priv->func = func;
            priv->process = true;
    
            m_barrier->working++;
    }
    
    void ThreadPool::SkipWorker(Worker *wrk)
    {
            wrk->result = WR_SKIPPED;
    }
    
    long ThreadPool::Threads() const
    {
            return m_NThreads;
    }
    
    int NumOfCPUs()
    {
            SYSTEM_INFO info;
    
            GetSystemInfo(&info);
            return info.dwNumberOfProcessors;
    }
    
    } // namespace stpool