I'm looking for a way to wait for a number of jobs to finish, and then execute another completely different number of jobs. With threads, of course. A brief explanation: I created two worker threads, both executing run on io_service. The code below is taken from here.
For the sake of simplicity, i had created two types of jobs, CalculateFib i CalculateFib2. I want the CalculateFib2 jobs to start after and only after the CalculateFib jobs finish. I tried to use condition variable as explained here, but the program hangs if CalculateFib2 jobs are more than one. What am I doing wrong?
thx, dodol
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>
boost::mutex global_stream_lock;
boost::mutex mx;
boost::condition_variable cv;
void WorkerThread( boost::shared_ptr< boost::asio::io_service > io_service)
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Start" << std::endl;
global_stream_lock.unlock();
io_service->run();
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Thread Finish" << std::endl;
global_stream_lock.unlock();
}
size_t fib( size_t n )
{
if ( n <= 1 )
{
return n;
}
boost::this_thread::sleep(
boost::posix_time::milliseconds( 1000 )
);
return fib( n - 1 ) + fib( n - 2);
}
void CalculateFib( size_t n )
{
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Now calculating fib( " << n << " ) " << std::endl;
global_stream_lock.unlock();
size_t f = fib( n );
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] fib( " << n << " ) = " << f << std::endl;
global_stream_lock.unlock();
boost::lock_guard<boost::mutex> lk(mx);
cv.notify_all();
}
void CalculateFib2( size_t n )
{
boost::unique_lock<boost::mutex> lk(mx);
cv.wait(lk);
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] Now calculating fib2( " << n << " ) " << std::endl;
global_stream_lock.unlock();
size_t f = fib( n );
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] fib2( " << n << " ) = " << f << std::endl;
global_stream_lock.unlock();
}
int main( int argc, char * argv[] )
{
boost::shared_ptr< boost::asio::io_service > io_service(
new boost::asio::io_service
);
boost::shared_ptr< boost::asio::io_service::work > work(
new boost::asio::io_service::work( *io_service )
);
global_stream_lock.lock();
std::cout << "[" << boost::this_thread::get_id()
<< "] The program will exit when all work has finished."
<< std::endl;
global_stream_lock.unlock();
boost::thread_group worker_threads;
for( int x = 0; x < 2; ++x )
{
worker_threads.create_thread(
boost::bind( &WorkerThread, io_service )
);
}
io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
work.reset();
worker_threads.join_all();
return 0;
}
Inside CalculateFib2
the first thing you do is wait for the condition (cv
). This condition only gets signaled at the end of CalculateFib
. So, it stands to reason that execution never continues, unless the condition is triggered (by posting CalculateFib
) job.
Indeed, adding any other line like so:
io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib, 5 ) ); // <-- ADDED
makes execution run to completion.
In an effort to shed more light: if you isolate a Fib2 batch (in time) like
io_service->post( boost::bind( CalculateFib, 5 ) );
io_service->post( boost::bind( CalculateFib, 4 ) );
io_service->post( boost::bind( CalculateFib, 3 ) );
boost::this_thread::sleep(boost::posix_time::seconds( 10 ));
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
io_service->post( boost::bind( CalculateFib2, 1 ) );
all the Fib2 jobs will always block, regardless of the number of threads, because the Fib jobs had all exited before posting them. A simple
io_service->post( boost::bind( CalculateFib, 1 ) );
will unlock all the waiters (i.e. only as many as there are waiting threads, which is the number of available threads -1, because the Fib() jobs occupies a thread as well. Now with <7 threads this would deadlock, because there is no thread available to even start the Fib() job on (all threads are blocked waiting in Fib2)
To be honest I don't get what you are trying to achieve in terms of scheduling. I suspect you should be monitoring job queues and explicitely posting jobs ('tasks') only when you reached the required amount of items. That way you can KISS and get a very flexible interface to your job scheduling.
In general, with a thread group (pooling) you want to avoid blocking the threads for indefinite amounts of time. This has the potential to deadlock your work scheduling as well as perform poorly otherwise.