I am trying to implement a synchronized queue with condition variables using the boost threading library much like the example here -> (ImplementingThreadSafeQueue).
Background/Purpose: I am writing a windows service as part of a senior design project. Throughout the service I would like to have various levels of logging (both to files and to the windows event viewer), also I am using my own "EventTimer" wrapper around the "CreateTimerQueueTimer" function to create timed events, like the service reporting a heartbeat. My idea is to push message objects onto a synchronized queue and have a logger class watching the queue on its own thread, waiting to perform the various logging tasks. For simplicity I am only testing with strings right now.
The Problem: The logger thread runs on a method belonging to the logging class to grab work items from the queue. If I push stuff onto the queue from outside the class, lets say from an EventTimer thread or even from the MAIN thread, the logger never gets notified of new items in the queue. However, if I create two threads belonging to the logger class and use one of those threads to push something onto the queue, the logger will see it and respond. I would like to have any thread be able to add stuff to the queue and have the logger be notified of new items.
My code is below. Any help would be appreciated. Thank you for your time!
Synchronized Queue Code
#ifndef _SYNCHRONIZED_QUEUE_
#define _SYNCHRONIZED_QUEUE_
// Include Files
#include <boost\noncopyable.hpp>
#include <boost\thread.hpp>
#include <queue>
namespace GSMV
{
///////////////////////////////////////////////////////////////////////////////////////
/// Class: SynchronizedQueue
///
/// @Brief
/// SynchronizedQueue is a thread safe STL Queue wrapper that waits on Dequeue and
/// notifies a listening thread on Enqueue. It is not copyable.
///////////////////////////////////////////////////////////////////////////////////////
template <typename T>
class SynchronizedQueue : private boost::noncopyable
{
public:
struct Canceled{};
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Constructor
///
/// @Brief
/// Default constructor for the SynchronizedQueue object.
///////////////////////////////////////////////////////////////////////////////////////
SynchronizedQueue(void)
{
// Queue is not canceled to start with
this->mCanceled = false;
// Nobody waiting yet
this->mWaiting = 0;
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Enqueue
///
/// @Param const T &item: Item of type T to add to queue.
///
/// @Brief
/// Adds an item of type T to the queue notifying via a condition.
///////////////////////////////////////////////////////////////////////////////////////
void Enqueue(const T &item)
{
bool enqueued = false;
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
// add item to the queue
this->mQueue.push(item);
// notify others that queue has a new item
this->mItemAvailable.notify_one();
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Dequeue
///
/// @Return
/// Item of type T from front of queue.
///
/// @Brief
/// Returns an item of type T from the queue and deletes the front of the queue. Thread
/// will wait on an empty queue until it is signaled via Enqueue.
///////////////////////////////////////////////////////////////////////////////////////
T Dequeue(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
// one more thread is waiting on this item
++this->mWaiting;
// if the queue is empty, wait until an item is added
// lock is released inside the wait
// lock is re-acquired after the wait
while (this->mQueue.empty())
this->mItemAvailable.wait(lock);
// the thread is done waiting now
--this->mWaiting;
// retrieve and remove the item from the queue
T item = this->mQueue.front();
this->mQueue.pop();
return item;
// lock is released
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: GetSize
///
/// @Return
/// The current size of the queue (number of items in the queue).
///
/// @Brief
/// Returns the number of items contained in the queue.
///////////////////////////////////////////////////////////////////////////////////////
int GetSize(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
return this->mQueue.size();
// lock is released
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: IsEmpty
///
/// @Return
/// Boolean queue is empty.
///
/// @Brief
/// Returns true if queue is empty false otherwise.
///////////////////////////////////////////////////////////////////////////////////////
bool IsEmpty(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
return this->mQueue.empty();
// lock is released
}
void Cancel(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
this->mCanceled = true;
// notify all others that queue has a new item
this->mItemAvailable.notify_all();
while (0 < this->mWaiting)
this->mItemAvailable.wait(lock);
}
void Reset(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// reset the canceled arguement
this->mCanceled = false;
}
private:
bool mCanceled;
int mWaiting;
std::queue<T> mQueue; // the STL Queue
boost::mutex mMutex; // the mutex object
boost::condition_variable mItemAvailable; // the signal condition
};
} // Namespace GSMV
#endif /// _SYNCHRONIZED_QUEUE_
Logger Code
#ifndef _LOGGER_H_
#define _LOGGER_H_
#include "SynchronizedQueue.h"
#include <string>
#include <boost\thread.hpp>
namespace GSMV
{
static SynchronizedQueue<std::string> logQ;
class Logger
{
public:
Logger(void);
~Logger(void);
bool Start(void);
bool Stop(void);
bool IsRunning(void) const;
void LoggerWorkThread(void);
private:
boost::thread* mpLoggerThread;
};
} // Namespace GSMV
#endif
// FILE END - logger.h //
#include "Logger.h"
using namespace GSMV;
Logger::Logger(void)
{
this->mpLoggerThread = NULL;
}
Logger::~Logger(void)
{
this->Stop();
}
bool Logger::Start(void)
{
bool started = this->IsRunning();
if (!started)
{
this->mpLoggerThread = new boost::thread(&Logger::LoggerWorkThread, this);
started = (NULL != this->mpLoggerThread);
}
return started;
}
bool Logger::Stop(void)
{
bool stopped = !this->IsRunning();
if (!stopped)
{
this->mpLoggerThread->interrupt();
this->mpLoggerThread->join();
delete this->mpLoggerThread;
this->mpLoggerThread = NULL;
stopped = true;
}
return stopped;
}
bool Logger::IsRunning(void) const
{
return (NULL != this->mpLoggerThread);
}
void Logger::LoggerWorkThread(void)
{
std::cout << "Enter Logger Work Thread\n" << std::endl;
while (this->IsRunning())
{
std::cout << "LOG: wait for Q..." << std::endl;
std::string s = logQ.Dequeue();
std::cout << "LOG: Got item! => " << s << std::endl;
boost::this_thread::interruption_point();
}
std::cout << "Exit Logger Work Thread\n" << std::endl;
}
So using the above code I would create a logger object and call the Start() method. Ideally it kicks off a new thread which loops, checking the queue for string items, until the Stop() method is called. So back in my main function I can push strings onto the queue and the logger should get them, but the logger never gets notified. If it matters the queue is declared in the Logger header file as "static SynchronizedQueue logQ". Again, I would appreciate any suggestions here. Thanks!
You have to unlock the mutex before calling notify_one
or notify_all
on the condition variable.