Search code examples
c++multithreadingthread-safetypthreadsproducer-consumer

Producer/Consumer, How to make sure a thread safe queue is drained before turning off all consumers?


I am working on a project using a thread safe queue. It is basically a producer/consumer problem.

My code currently does is

void threadCode()//the consumer
{
    while(active) // active is an atomic int, we use it to turn off everything during destruction
    {
        threadSafeQueue.popFront();//if queue is empty, it will wait for a signal.The queue has a CV.
        // process it
        // if it fails to process it but less than three times, 
        // put it back to the queue to retry later
    }
}

The problem is, when my destructor change active to be 0, even if the queue is not empty, all threads will be terminated. For example it failed to process the item, put it back to the queue, then active is 0 now.

I dont want this to happen. I wish the instance to be destroyed after everything in the queue been processed.

So I tried this,

void threadCode()
{
    while( active || queue.size() != 0 )
    { //[1]
        queue.popFront();
        //process
        // put it back to the queue if it fails less than 3 times
    }
}

queue.size() and queue.popFront() are thread safe. But putting them together is not... If there is only one thing left in the queue and a context switch happens at [1]. That thread may sleep forever.

Since I have something like threadpool.join() in the destructor, and that thread never wake up. The problem just stuck there.

I wonder is there anyone has better idea to solve this problem?

Thanks!!


Solution

  • Instead of having the consumer threads check for an external flag, have the queue itself maintain an internal "shutting down" flag. If there's no more work to process, then the .popFront() function returns a "shutting down" status instead of an item to process.