I'm trying to create an inter thread message based communications using C++11 concurrency techniques. Anthony William's book 'Concurrency in Action' describes a thread safe locking queue which this implementation is based on. The difference between the thread safe locking queue that is described in the book and the one I want to implement is firstly that I am using universal references to forward queue elements to the blocking queue, and secondly (and this is where things are going wrong) I need to be able to store a queue of std::shared_ptr pointers as the template types consist of a simple message class hierarchy with an abstract base class and sub-classes with the actual specialized messages. I need to use shared pointers to avoid data slicing.
EDIT: I added a coliru demo to show my problem more clearly. Live Demo
EDIT 1: more updates to the coliru live demo with additional compiler errors: Coliru Demo With Compiler Errors
EDIT 2: Thanks to Alejandro I have a working solution Working Coliru Example
To this end I changed Anthony William's implementation of the underlying message queue from:
std::queue<T> data_queue
to a
std::queue<std::shared_ptr<T>> data_queue
but then when I attempt to push message pointers on the queue via the universal reference perfect forwarding signature, I get all sorts of errors.
The way I would like to be able to add message on this queue is as follows:
UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);
With the above code, the compiler complains indicating something along the following lines error C2664: 'void UtlThreadSafeQueue::push(T &&)' : cannot convert argument 1 from 'std::shared_ptr' to 'BaseMessageType &&' with T=BaseMessageType
I think I need some way to specialize pointer types but I am not sure.
My implementation is as follows:
/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/
#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_
// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>
// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
// Windows uses min/max macros
#undef min
#undef max
#endif
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
template<typename T>
class UtlThreadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
std::size_t capacity;
std::atomic<bool> shutdownFlag;
public:
explicit UtlThreadSafeQueue(const size_t& rCapacity =
std::numeric_limits<std::size_t>::max())
: mut()
, data_queue()
, data_cond()
, capacity(rCapacity)
, shutdownFlag(false)
{}
UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
std::lock_guard<std::mutex> lock(rhs.mut);
data_queue = rhs.data_queue;
}
virtual ~UtlThreadSafeQueue() = default;
// move aware push
inline void push(T&& value) {
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::forward<T>(value));
data_cond.notify_one();
}
// wait for non empty lambda condition before returning value
inline void wait_and_pop(T& rValue) {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty();});
// ideally should return an invalid value
if (!shutdownFlag) {
rValue = data_queue.front();
data_queue.pop();
}
}
// wait for non empty lambda condition before returning shared pointer to value
inline std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
if (shutdownFlag) {
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
return nullptr;
}
// return value in specified reference and flag indicating whether value
// successfully returned or not
inline bool try_pop(T& rValue) {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return false;
}
rValue = data_queue.front();
data_queue.pop();
return true;
}
// return shared pointer to value - which if set to nullptr,
// indicates container was empty at the time of the call.
inline std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
// thread safe method to check if the queue is empty
// note that if it is empty
inline bool empty() const {
std::lock_guard<std::mutex> lock(mut);
return data_queue.empty();
}
// shutdown support - wake up potentially sleeping queues
inline void shutdown() {
shutdownFlag = true;
data_cond.notify_all();
}
};
#endif // _utlThreadSafeQueue_h_
Following the extended discussion in the comments, and as per the Coliru links, I think I understand what you were originally attempting to do, and I'd also like to make some suggestions for your data structure.
You mentioned that your push()
function is move aware. Excellent! But, be careful.
If you look at how you defined your push
function,
inline void push(T&& value)
there's a few things I'd like to point out here. The first is that this will only bind to r-value references and not universal references (or, forwarding references , as they will soon be called ). Your use of std::forward
within push
is the inappropriate thing to do (albeit technically correct). The reason is that the type T
will have already been deduced at the class level ( when you instantiate a UtlThreadSafeQueue
). To get perfect-forwarding semantics, you would need a push
like the following:
template<typename U>
inline void push(U&& value) { ... }
This version of push
accepts any type of reference, as you would expect. However, it's use would be to forward along any arguments to an appropriate constructor/function/etc. Since you wish to maintain an internal std::queue<std::shared_ptr<BaseMessage>>
, you can have a push
that accepts a reference (lvalue or rvalue) to a derived type of BaseMessage
, and emplace a std::shared_ptr<DerivedType>
into the queue. This would establish a pointer-to-base relationship (std::shared_ptr<BaseMessage> base_ptr = derived_ptr
, where derived_ptr
is of type std::shared_ptr<DerivedMessage>
). This can be accomplished with the following:
template<typename U>
inline
std::enable_if_t<std::is_base_of<T,std::decay_t<U>>::value> push(U&& value)
{
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::make_shared<std::decay_t<U>> (std::forward<U>(value)));
data_cond.notify_one();
}
The use of std::enable_if_t
makes sure that only types that are derived from BaseMessage
are passed in to the push
function. The emplacement into the queue of std::make_shared<std::decay_t<U>> (std::forward<U>(value))
will call the 9th constructor of std::shared_ptr
(as listed Here ).
The great thing about this, is that it will allow you and your users to write code like this:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ(10);
StringMessage sm("Hi there!");
IntegerMessage im(4242);
dataLoadSessionQ.push(sm);
dataLoadSessionQ.push(im);
And it would behave as expected. Each Message is passed in by lvalue-ref and a shared_ptr
is made by calling the copy-ctor of the derived type.
You're exposing a push
interface that accepts not just a std::shared_ptr
, but a std::shared_ptr&&
which has some subtleties.
At first glance, it may seem as if I wouldn't be able to do this ( borrowing the StringMessage
and BaseMessage
types from your Coliru links):
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
auto my_message = std::make_shared<StringMessage>("Another message!");
dataLoadSessionQ.push(my_message);
Despite the fact that push
is defined to take an r-value reference to a shared_ptr
, this code compiles by passing it my_message
(which is not an r-value reference!). And the reason was not immediately clear to me at first. But, as it turns out, similar to static_cast
, there's a static_pointer_cast
defined for shared_ptr
which looks like the following ( borrowed from Here ):
template< class T, class U >
std::shared_ptr<T> static_pointer_cast( const std::shared_ptr<U>& r );
It will perform a conversion from a shared_ptr<U>
to a shared_ptr<T>
if such a conversion succeeds. Because your Coliru example uses a std::queue<std::shared_ptr<BaseMessage>>
internally, and you're trying to push a shared_ptr<StringMessage>
, the implicit conversion to shared_ptr<BaseMessage>
succeeds since StringMessage
inherits from BaseMessage
. The conversion returns an alias-constructedstd::shared_ptr<BaseMessage>
, which will happily bind to the rvalue-reference.
Notice that if you instead try this:
UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
auto generic_message = std::make_shared<BaseMessage>();
dataLoadSessionQ.push(generic_message);
You get the compiler error we ( or maybe it was just me) were expecting initially
error: cannot bind 'std::shared_ptr' lvalue to 'std::shared_ptr&&' dataLoadSessionQ.push(generic_message);
I honestly can't find a good reason from either a performance or aesthetic aspect to have to pass in a shared_ptr<Derived>
to a UtlThreadSafeQueue<Base>
. I expect to be able to pass in both a temporary Derived
and an lvalue, and not fret with the internals of the queue too much.
You can also capitalize in the wait_and_pop()
/try_pop()
that return shared_ptr
s by std::move
ing the value from data_queue.front()
(since it was going to be destructed anyways in the next call to data_queue.pop()
)
In your UtlThreadSafeQueue
constructor, I would also consider changing the const std::size_t&
to just a by-value std::size_t
and likewise for the (example) IntegerMessage
type.
With this in mind, I would appreciate any feedback to the changes I highlighted above - I just quite frankly couldn't grok your goal / implementation until much later when you posted more examples and kept editing the question.