Search code examples
c++multithreadingc++11ipcshared-ptr

unable to add elements to thread safe locking queue of shared pointers


I'm trying to create an inter thread message based communications using C++11 concurrency techniques. Anthony William's book 'Concurrency in Action' describes a thread safe locking queue which this implementation is based on. The difference between the thread safe locking queue that is described in the book and the one I want to implement is firstly that I am using universal references to forward queue elements to the blocking queue, and secondly (and this is where things are going wrong) I need to be able to store a queue of std::shared_ptr pointers as the template types consist of a simple message class hierarchy with an abstract base class and sub-classes with the actual specialized messages. I need to use shared pointers to avoid data slicing.

EDIT: I added a coliru demo to show my problem more clearly. Live Demo

EDIT 1: more updates to the coliru live demo with additional compiler errors: Coliru Demo With Compiler Errors

EDIT 2: Thanks to Alejandro I have a working solution Working Coliru Example

To this end I changed Anthony William's implementation of the underlying message queue from:

std::queue<T> data_queue

to a

std::queue<std::shared_ptr<T>> data_queue

but then when I attempt to push message pointers on the queue via the universal reference perfect forwarding signature, I get all sorts of errors.

The way I would like to be able to add message on this queue is as follows:

UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);

With the above code, the compiler complains indicating something along the following lines error C2664: 'void UtlThreadSafeQueue::push(T &&)' : cannot convert argument 1 from 'std::shared_ptr' to 'BaseMessageType &&' with T=BaseMessageType

I think I need some way to specialize pointer types but I am not sure.

My implementation is as follows:

/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/

#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_

// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>

// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
    // Windows uses min/max macros
    #undef min
    #undef max
#endif

// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS

template<typename T>
class UtlThreadSafeQueue {
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T>> data_queue;
    std::condition_variable data_cond;
    std::size_t capacity;
    std::atomic<bool> shutdownFlag;
public:
    explicit UtlThreadSafeQueue(const size_t& rCapacity =
        std::numeric_limits<std::size_t>::max())
        : mut()
        , data_queue()
        , data_cond()
        , capacity(rCapacity)
        , shutdownFlag(false)
    {}

    UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
        std::lock_guard<std::mutex> lock(rhs.mut);
        data_queue = rhs.data_queue;
    }

    virtual ~UtlThreadSafeQueue() = default;

    // move aware push
    inline void push(T&& value) {
        std::unique_lock<std::mutex> lock(mut);
        // only add the value on the stack if there is room
        data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
        data_queue.emplace(std::forward<T>(value));
        data_cond.notify_one();
    }

    // wait for non empty lambda condition before returning value
    inline void wait_and_pop(T& rValue) {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty();});
        // ideally should return an invalid value
        if (!shutdownFlag) {
            rValue = data_queue.front();
            data_queue.pop();
        }
    }

    // wait for non empty lambda condition before returning shared pointer to value
    inline std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
        if (shutdownFlag) {
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }
        return nullptr;
    }

    // return value in specified reference and flag indicating whether value
    // successfully returned or not
    inline bool try_pop(T& rValue) {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return false;
        }
        rValue = data_queue.front();
        data_queue.pop();
        return true;
    }

    // return shared pointer to value - which if set to nullptr,
    // indicates container was empty at the time of the call.
    inline std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    // thread safe method to check if the queue is empty
    // note that if it is empty
    inline bool empty() const {
        std::lock_guard<std::mutex> lock(mut);
        return data_queue.empty();
    }

    // shutdown support - wake up potentially sleeping queues
    inline void shutdown() {
        shutdownFlag = true;
        data_cond.notify_all();
    }
};

#endif // _utlThreadSafeQueue_h_

Solution

  • Following the extended discussion in the comments, and as per the Coliru links, I think I understand what you were originally attempting to do, and I'd also like to make some suggestions for your data structure.

    • You mentioned that your push() function is move aware. Excellent! But, be careful.

      If you look at how you defined your push function,

      inline void push(T&& value)
      

      there's a few things I'd like to point out here. The first is that this will only bind to r-value references and not universal references (or, forwarding references , as they will soon be called ). Your use of std::forward within push is the inappropriate thing to do (albeit technically correct). The reason is that the type T will have already been deduced at the class level ( when you instantiate a UtlThreadSafeQueue). To get perfect-forwarding semantics, you would need a push like the following:

      template<typename U>
      inline void push(U&& value) { ... }
      

      This version of push accepts any type of reference, as you would expect. However, it's use would be to forward along any arguments to an appropriate constructor/function/etc. Since you wish to maintain an internal std::queue<std::shared_ptr<BaseMessage>> , you can have a push that accepts a reference (lvalue or rvalue) to a derived type of BaseMessage, and emplace a std::shared_ptr<DerivedType> into the queue. This would establish a pointer-to-base relationship (std::shared_ptr<BaseMessage> base_ptr = derived_ptr, where derived_ptr is of type std::shared_ptr<DerivedMessage>). This can be accomplished with the following:

      template<typename U>
      inline
      std::enable_if_t<std::is_base_of<T,std::decay_t<U>>::value> push(U&& value)   
      {
      
        std::unique_lock<std::mutex> lock(mut);
        // only add the value on the stack if there is room
        data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
        data_queue.emplace(std::make_shared<std::decay_t<U>> (std::forward<U>(value)));
        data_cond.notify_one();
      }
      

      The use of std::enable_if_t makes sure that only types that are derived from BaseMessage are passed in to the push function. The emplacement into the queue of std::make_shared<std::decay_t<U>> (std::forward<U>(value)) will call the 9th constructor of std::shared_ptr (as listed Here ).

      The great thing about this, is that it will allow you and your users to write code like this:

      UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ(10);
      StringMessage sm("Hi there!");
      IntegerMessage im(4242);
      dataLoadSessionQ.push(sm);
      dataLoadSessionQ.push(im);
      

      And it would behave as expected. Each Message is passed in by lvalue-ref and a shared_ptr is made by calling the copy-ctor of the derived type.

    • You're exposing a push interface that accepts not just a std::shared_ptr, but a std::shared_ptr&& which has some subtleties.

      At first glance, it may seem as if I wouldn't be able to do this ( borrowing the StringMessage and BaseMessage types from your Coliru links):

        UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
        auto my_message = std::make_shared<StringMessage>("Another message!");
        dataLoadSessionQ.push(my_message);
      

      Despite the fact that push is defined to take an r-value reference to a shared_ptr, this code compiles by passing it my_message (which is not an r-value reference!). And the reason was not immediately clear to me at first. But, as it turns out, similar to static_cast, there's a static_pointer_cast defined for shared_ptr which looks like the following ( borrowed from Here ):

      template< class T, class U > 
      std::shared_ptr<T> static_pointer_cast( const std::shared_ptr<U>& r );
      

      It will perform a conversion from a shared_ptr<U> to a shared_ptr<T> if such a conversion succeeds. Because your Coliru example uses a std::queue<std::shared_ptr<BaseMessage>> internally, and you're trying to push a shared_ptr<StringMessage>, the implicit conversion to shared_ptr<BaseMessage> succeeds since StringMessage inherits from BaseMessage. The conversion returns an alias-constructedstd::shared_ptr<BaseMessage>, which will happily bind to the rvalue-reference.

      Notice that if you instead try this:

      UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
      auto generic_message = std::make_shared<BaseMessage>(); 
      dataLoadSessionQ.push(generic_message);
      

      You get the compiler error we ( or maybe it was just me) were expecting initially

    error: cannot bind 'std::shared_ptr' lvalue to 'std::shared_ptr&&' dataLoadSessionQ.push(generic_message);

    I honestly can't find a good reason from either a performance or aesthetic aspect to have to pass in a shared_ptr<Derived> to a UtlThreadSafeQueue<Base>. I expect to be able to pass in both a temporary Derived and an lvalue, and not fret with the internals of the queue too much. You can also capitalize in the wait_and_pop()/try_pop() that return shared_ptrs by std::moveing the value from data_queue.front() (since it was going to be destructed anyways in the next call to data_queue.pop())

    In your UtlThreadSafeQueue constructor, I would also consider changing the const std::size_t& to just a by-value std::size_t and likewise for the (example) IntegerMessage type.

    With this in mind, I would appreciate any feedback to the changes I highlighted above - I just quite frankly couldn't grok your goal / implementation until much later when you posted more examples and kept editing the question.