Search code examples
boostboost-interprocess

spsc_queue over shared_memory


I tried several hours, and this problem almost drives me crazy.

I want create a spsc_queue over shared memory, and each element in the queue is a mq_item_t structure below.

typedef struct _mq_item_t{
    mq_item_type type;

    union
    {
        struct{
            log_level_t level;
            char *      text;
        } log;
        struct{
            char *      control;
            size_t      control_size;
            char *      payload;
            size_t      payload_size;
        } error;
        struct{
            char *      channel;
            char *      control;
            size_t      control_size;
            char *      payload;
            size_t      payload_size;
        } data;
    };
} mq_item_t;

Then I have following code to create the spsc_queue.

typedef boost::interprocess::managed_windows_shared_memory native_managed_shared_memory;
typedef boost::interprocess::allocator<mq_item_t, native_managed_shared_memory::segment_manager> shmem_allocator;
typedef boost::lockfree::spsc_queue< mq_item_t, boost::lockfree::allocator<shmem_allocator>> lockfree_queue;

m_segment = new native_managed_shared_memory(create_only, mem_name, SHARED_MEMORY_BYTES, NULL, perm);
shmem_allocator alloc(m_segment->get_segment_manager());
m_segment->find_or_construct<lockfree_queue>("name of the queue")(65535, alloc);

And it results in compilation error. Please what I am doing wrong here?

boost/lockfree/spsc_queue.hpp(609): error C2664: 'boost::lockfree::detail::ringbuffer_base<T>::pop' : cannot convert parameter 3 from 'boost::interprocess::offset_ptr<PointedType,DifferenceType,OffsetType,OffsetAlignment>' to 'mq_item_t *'
1>          with
1>          [
1>              T=mq_item_t
1>          ]
1>          and
1>          [
1>              PointedType=mq_item_t,
1>              DifferenceType=ptrdiff_t,
1>              OffsetType=size_t,
1>              OffsetAlignment=0x00
1>          ]
1>          No user-defined-conversion operator available that can perform this conversion, or the operator cannot be called
1>          boost/lockfree/spsc_queue.hpp(608) : while compiling class template member function 'boost::lockfree::detail::runtime_sized_ringbuffer<T,Alloc>::size_type boost::lockfree::detail::runtime_sized_ringbuffer<T,Alloc>::pop(T *,boost::lockfree::detail::runtime_sized_ringbuffer<T,Alloc>::size_type)'
1>          with
1>          [
1>              T=mq_item_t,
1>              Alloc=boost::interprocess::allocator<mq_item_t,boost::interprocess::segment_manager<char,boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,boost::interprocess::iset_index>>
1>          ]
1>          boost/lockfree/spsc_queue.hpp(681) : see reference to class template instantiation 'boost::lockfree::detail::runtime_sized_ringbuffer<T,Alloc>' being compiled
1>          with
1>          [
1>              T=mq_item_t,
1>              Alloc=boost::interprocess::allocator<mq_item_t,boost::interprocess::segment_manager<char,boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,boost::interprocess::iset_index>>
1>          ]
1>          boost/interprocess/detail/named_proxy.hpp(213) : see reference to class template instantiation 'boost::lockfree::spsc_queue<T,A0>' being compiled
1>          with
1>          [
1>              T=mq_item_t,
1>              A0=boost::lockfree::allocator<shmem_allocator>
1>          ]
1>          boost/interprocess/detail/named_proxy.hpp(213) : while compiling class template member function 'void boost::interprocess::ipcdetail::CtorArg2<T,P0,P1>::construct_n(void *,size_t,size_t &)'
1>          with
1>          [
1>              T=lockfree_queue,
1>              P0=int,
1>              P1=boost::interprocess::allocator<mq_item_t,boost::interprocess::segment_manager<char,boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,boost::interprocess::iset_index>> &
1>          ]
1>          boost/interprocess/detail/named_proxy.hpp(282) : see reference to class template instantiation 'boost::interprocess::ipcdetail::CtorArg2<T,P0,P1>' being compiled
1>          with
1>          [
1>              T=lockfree_queue,
1>              P0=int,
1>              P1=boost::interprocess::allocator<mq_item_t,boost::interprocess::segment_manager<char,boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,boost::interprocess::iset_index>> &
1>          ]
1>          ..\common\sink.cpp(26) : see reference to function template instantiation 'T *boost::interprocess::ipcdetail::named_proxy<SegmentManager,T,is_iterator>::operator ()<int,shmem_allocator&>(P0 &&,P1) const' being compiled
1>          with
1>          [
1>              T=lockfree_queue,
1>              SegmentManager=boost::interprocess::segment_manager<char,boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,boost::interprocess::iset_index>,
1>              is_iterator=false,
1>              P0=int,
1>              P1=shmem_allocator &
1>          ]

Solution

  • I've shown how to do this before: Shared-memory IPC synchronization (lock-free)

    I can't at the moment seem to make the lockfree::allocator<> version work (because spsc_queue doesn't appear to work with non-raw pointers (offset_ptr<>) involved).

    Strangely I couldn't make it work with older boost versions, so now I wonder whether that ever worked. But that's not really relevant:

    However, there are some bigger issues to address:

    • because the shared memory area is fixed size, it seems to make sense to limit the spsc_queue capacity statically too. To this end, use boost::lockfree::capacity<65535> instead of boost::lockfree::allocator<>. This does compile, but

    • your mq_item_t is littered with raw pointers. That will never work, since the pointers will not be valid in another process space. I'd suggest something like:

      typedef boost::interprocess::allocator<char, msm_t::segment_manager> char_alloc;
      typedef boost::interprocess::basic_string<char, std::char_traits<char>, char_alloc> shmem_string;
      typedef boost::interprocess::vector<char, char_alloc> shmem_blob;
      
      struct log {
          typedef char_alloc allocator_type;
      
          log(char_alloc alloc);
          log(log_level_t level, const char* txt, char_alloc alloc);
      
          log_level_t level;
          shmem_string text;
      };
      
      struct error {
          typedef char_alloc allocator_type;
      
          error(char_alloc alloc);
      
          template <typename Ctl, typename Payload>
              error(Ctl const& ctl, Payload const& pl, char_alloc alloc);
      
          shmem_blob control, payload;
      };
      
      struct data {
          typedef char_alloc allocator_type;
      
          data(char_alloc alloc);
      
          template <typename Ctl, typename Payload>
              data(char const* channel, Ctl const& ctl, Payload const& pl, char_alloc alloc);
      
          shmem_string channel;
          shmem_blob control, payload;
      };
      
      typedef boost::variant<log, error, data> item_t;
      

    See a full demo live:

    Live On Coliru

    #include <boost/lockfree/spsc_queue.hpp>
    #include <boost/interprocess/managed_shared_memory.hpp>
    #include <boost/interprocess/managed_mapped_file.hpp>
    #include <boost/interprocess/containers/string.hpp>
    #include <boost/interprocess/containers/vector.hpp>
    #include <boost/variant.hpp>
    #include <boost/range.hpp>
    #include <string>
    #include <iostream>
    
    namespace bip = boost::interprocess;
    using boost::lockfree::spsc_queue;
    
    #ifdef COLIRU
    typedef bip::managed_mapped_file msm_t;
    #else
    typedef bip::managed_shared_memory msm_t;
    #endif
    
    namespace mq {
        enum log_level_t { default_level };
    
        typedef boost::interprocess::allocator<char, msm_t::segment_manager> char_alloc;
        typedef boost::interprocess::basic_string<char, std::char_traits<char>, char_alloc> shmem_string;
        typedef boost::interprocess::vector<char, char_alloc> shmem_blob;
    
        using boost::begin;
        using boost::end;
    
        struct log {
            typedef char_alloc allocator_type;
    
            log(char_alloc alloc) : text(alloc) {}
            log(log_level_t level, const char* txt, char_alloc alloc) : level(level), text(txt, alloc) { };
    
            log_level_t level;
            shmem_string text;
        };
    
        struct error {
            typedef char_alloc allocator_type;
    
            error(char_alloc alloc) : control(alloc), payload(alloc) {}
    
            template <typename Ctl, typename Payload>
            error(Ctl const& ctl, Payload const& pl, char_alloc alloc) 
                : control(begin(ctl), end(ctl), alloc),
                  payload(begin(pl), end(pl), alloc)
            { }
    
            shmem_blob control, payload;
        };
    
        struct data {
            typedef char_alloc allocator_type;
    
            data(char_alloc alloc) : channel(alloc), control(alloc), payload(alloc) {}
    
            template <typename Ctl, typename Payload>
            data(char const* channel, Ctl const& ctl, Payload const& pl, char_alloc alloc) 
                : channel(channel, alloc),
                  control(begin(ctl), end(ctl), alloc),
                  payload(begin(pl), end(pl), alloc)
            { }
    
            shmem_string channel;
            shmem_blob control, payload;
        };
    
        typedef boost::variant<log, error, data> item_t;
    }
    
    namespace {
        static char const* mem_name = "21845989-f5e0-4c84-b170-cd34dc9f37fc";
        static constexpr size_t SHARED_MEMORY_BYTES = 10ul << 10;
    }
    
    int main()
    {
        typedef boost::interprocess::allocator<mq::item_t, msm_t::segment_manager> shmem_allocator;
    #ifdef COLIRU
        typedef boost::lockfree::spsc_queue<mq::item_t, boost::lockfree::capacity<10> > lockfree_queue;
    #else
        typedef boost::lockfree::spsc_queue<mq::item_t, boost::lockfree::capacity<65535> > lockfree_queue;
    #endif
    
        msm_t m_segment(bip::create_only, mem_name, SHARED_MEMORY_BYTES);
        shmem_allocator alloc(m_segment.get_segment_manager());
    
        auto queue = m_segment.find_or_construct<lockfree_queue>("name of the queue")();
    
        queue->push(mq::error("control", "payload", alloc));
        queue->push(mq::data("channel", "control", "payload", alloc));
        queue->push(mq::log(mq::default_level, "text", alloc));
    }