I am having a problem with pthreads where i think i am getting a deadlock. I have created a blocking queue which I thought was working, but after doing some more testing I have found that if i try and cancel multiple threads that are blocking on the blocking_queue, i seem to get a deadlock.
The blocking queue is very simple and looks like this:
template <class T> class Blocking_Queue
{
public:
Blocking_Queue()
{
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_cond, NULL);
}
~Blocking_Queue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
void put(T t)
{
pthread_mutex_lock(&_lock);
_queue.push(t);
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_lock);
}
T pull()
{
pthread_mutex_lock(&_lock);
while(_queue.empty())
{
pthread_cond_wait(&_cond, &_lock);
}
T t = _queue.front();
_queue.pop();
pthread_mutex_unlock(&_lock);
return t;
}
priavte:
std::queue<T> _queue;
pthread_cond_t _cond;
pthread_mutex_t _lock;
}
For testing, I have created 4 threads that pull on this blocking queue. I added some print statements to the blocking queue and each thread is getting to the pthread_cond_wait() method. However, when i try to call pthread_cancel() and pthread_join() on each thread the program just hangs.
I have also tested this with just one thread and it works perfectly.
According to documentation, pthread_cond_wait() is a cancellation point, so calling cancel on those threads should cause them to stop execution (and this does work with just 1 thread). However pthread_mutex_lock is not a cancelation point. Could something be happening along the lines of when pthread_cancel() is called, the canceled thread aquires the mutex before terminating and doesn't unlock it, and then when the next thread gets canceled it cannot aquire the mutex and deadlocks? Or is there something else that I am doing wrong.
Any advice would be lovely. Thanks :)
pthread_cancel()
is best avoided.
You can unblock all your threads blocked on Blocking_Queue::pull() by throwing an exception from there.
One weak spot in the queue is that T t = _queue.front();
invokes the copy constructor of T that may throw an exception, rendering you queue mutex locked forever. Better use C++ scoped locks.
Here is an example of graceful thread termination:
$ cat test.cc
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition_variable.hpp>
#include <exception>
#include <list>
#include <stdio.h>
struct BlockingQueueTerminate
: std::exception
{};
template<class T>
class BlockingQueue
{
private:
boost::mutex mtx_;
boost::condition_variable cnd_;
std::list<T> q_;
unsigned blocked_;
bool stop_;
public:
BlockingQueue()
: blocked_()
, stop_()
{}
~BlockingQueue()
{
this->stop(true);
}
void stop(bool wait)
{
// tell threads blocked on BlockingQueue::pull() to leave
boost::mutex::scoped_lock lock(mtx_);
stop_ = true;
cnd_.notify_all();
if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull()
while(blocked_)
cnd_.wait(lock);
}
void put(T t)
{
boost::mutex::scoped_lock lock(mtx_);
q_.push_back(t);
cnd_.notify_one();
}
T pull()
{
boost::mutex::scoped_lock lock(mtx_);
++blocked_;
while(!stop_ && q_.empty())
cnd_.wait(lock);
--blocked_;
if(stop_) {
cnd_.notify_all(); // tell stop() this thread has left
throw BlockingQueueTerminate();
}
T front = q_.front();
q_.pop_front();
return front;
}
};
void sleep_ms(unsigned ms)
{
// i am using old boost
boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(ms));
// with latest one you can do this
//boost::thread::sleep(boost::posix_time::milliseconds(10));
}
void thread(int n, BlockingQueue<int>* q)
try
{
for(;;) {
int m = q->pull();
printf("thread %u: pulled %d\n", n, m);
sleep_ms(10);
}
}
catch(BlockingQueueTerminate&)
{
printf("thread %u: finished\n", n);
}
int main()
{
BlockingQueue<int> q;
// create two threads
boost::thread_group tg;
tg.create_thread(boost::bind(thread, 1, &q));
tg.create_thread(boost::bind(thread, 2, &q));
for(int i = 1; i < 10; ++i)
q.put(i);
sleep_ms(100); // let the threads do something
q.stop(false); // tell the threads to stop
tg.join_all(); // wait till they stop
}
$ g++ -pthread -Wall -Wextra -o test -lboost_thread-mt test.cc
$ ./test
thread 2: pulled 1
thread 1: pulled 2
thread 1: pulled 3
thread 2: pulled 4
thread 1: pulled 5
thread 2: pulled 6
thread 1: pulled 7
thread 2: pulled 8
thread 1: pulled 9
thread 2: finished
thread 1: finished