Search code examples
multithreadingc++11mutexstdthread

Sending jobs to a std::thread


I am quite new to std::thread and I quickly realized that creating them is quite costly at least on my computer running W7. So I decided to create my threads and send jobs to it using that piece of sample code: http://en.cppreference.com/w/cpp/thread/condition_variable

My code runs well no crash however I didn't notice much performance increase so I measured the difference between the time the job finishes and the time the job is detected finished by the main thread ( see WaitUntilJobFinished() ) I noticed that on some rare occasions the time difference was over 2 milliseconds

Does anyone see anything wrong with the code?

Code:

class CJobParameters
{
public:
};

typedef void (*CJobFunc)( const CJobParameters * );

class CThread
{   
public:
    void Start();
    void WaitUntilJobDone();
    void StartJob( CJobFunc inJobFunc, const CJobParameters * inJobParameters );

    std::thread m_stdThread;

    CJobFunc                m_jobFunc       = nullptr;
    const CJobParameters *  m_jobParameters = nullptr;
    //std::atomic<bool>     m_jobDone       = true;
    std::mutex              m_mutex;
    std::condition_variable m_cv;

    __int64 m_jobDoneAt = 0;
    __int64 m_threadJoinedAt = 0;
    __int64 m_lostTime = 0;
};

class CThreads
{
public:
    static void Start();
    static CThread  threadArray[ JOB_COUNT ];
};


void ThreadMain( CThread * inThread )
{
    while ( true )
    {
        std::unique_lock<std::mutex> lk( inThread->m_mutex );
        inThread->m_cv.wait(lk, [ inThread ]{return inThread->m_jobParameters != nullptr;});
        if ( inThread->m_jobFunc )
        {
            (*inThread->m_jobFunc)( inThread->m_jobParameters );
            inThread->m_jobFunc = nullptr;
            inThread->m_jobParameters = nullptr;
            inThread->m_jobDoneAt = COSToolbox::QuerySystemTime2();
        }
        lk.unlock();
        inThread->m_cv.notify_one();
        std::this_thread::sleep_for( std::chrono::nanoseconds(0) );
    }
}

void CThread::StartJob( CJobFunc inJobFunc, const CJobParameters * inJobParameters )
{
    std::lock_guard<std::mutex> lk( m_mutex );
    m_jobFunc           = inJobFunc;
    m_jobParameters     = inJobParameters;
    m_cv.notify_one();
}

void CThread::Start()
{
    m_stdThread = std::thread( ThreadMain, this );
}

void CThread::WaitUntilJobDone()
{
    std::unique_lock<std::mutex> lk( m_mutex );
    m_cv.wait(lk, [ this ]{return this->m_jobParameters == nullptr;});

    m_threadJoinedAt = COSToolbox::QuerySystemTime2();
    m_lostTime = m_threadJoinedAt - m_jobDoneAt;
    LOG_INFO( "Thread joined with %f ms lost", (Float32)m_lostTime / 1000 );
}


CThread CThreads::threadArray[ JOB_COUNT ];
void CThreads::Start()
{
    for ( Int32 i = 0; i < JOB_COUNT; ++i )
    {
        threadArray[i].Start();
    }
}

void MyJobFunc( const CJobParameters  * jobParameters )
{
    // do job here
}
void main()
{
    CThreads::Start();
    while(true)
    {
        CJobParameters jobParametersArray[ JOB_COUNT ];
        for ( Int32 i = 0; i < JOB_COUNT; ++i )
        {
            CThread & thread = CThreads::threadArray[ i ];
            CJobParameters& jobParameters = jobParametersArray[ i ];
            jobParameters.m_ // Fill in params
            thread.StartJob( &MyJobFunc, &jobParameters );
        }
        for ( Int32 i = 0; i < JOB_COUNT; ++i )
        {
            CThread & thread = CThreads::threadArray[ i ];
            // Prints 2 ms sometimes whith i = 0
            thread.WaitUntilJobDone();
        }
     }
}

Solution

  • Two things:

    You are yielding your processor time unconditionally, and on some older versions of windows, you yield the entire process, not just thread:

    std::this_thread::sleep_for( std::chrono::nanoseconds(0) );
    

    this yield is unnecessary. I suspect the reason you're doing it is that without it you were getting a spin loop, resulting from the fact that you are both reading and writing to a single condition variable.

    You need two condition variables, one for work pending and one for work done. Typically the listener will pass the condition variable or a struct containing it as a parameter to the thread function, allowing you to pass a single condition variable from your dispatcher.