Search code examples
c++11boostqueuelock-free

Getting wrong output from boost lock free spsc queue


I am trying to implement lock free queue of user defined data type using boost library, but I am getting wrong result.

Please help me out where I am doing wrong.

#include <boost/lockfree/spsc_queue.hpp>
#include <thread>
#include <iostream>
#include <string.h>
#include <time.h>   


class Queue
{
 private:
       unsigned char *m_data;
       int m_len;

 public:
       Queue(unsigned char *data,int len);

       Queue(const Queue &obj);

       ~Queue();  

       Queue & operator =(const Queue &obj);


       unsigned char *getdata()
       {
         return m_data;
       }

       int getint()
       {
           return m_len;
       }
};

Queue::Queue(unsigned char* data, int len)
{
      m_len=len;

      m_data=new unsigned char[m_len];

      memcpy(m_data,data,m_len);
}

Queue::Queue(const Queue& obj)
{
      m_len=  obj.m_len;

      m_data=new unsigned char[m_len];

      memcpy(m_data,(unsigned char *)obj.m_data,m_len);
}

Queue::~Queue()
{
   delete[] m_data;
   m_len=0;
}

Queue & Queue::operator =(const Queue &obj)
{
   if(this != &obj)
   {
      m_len=obj.m_len;

      m_data=new unsigned char[m_len];

      memcpy(m_data,(unsigned char *)obj.m_data,m_len);
   }

   return *this;
}

boost::lockfree::spsc_queue<Queue*> q(10);



void produce()
{
    int i=0;
    unsigned char* data=(unsigned char *)malloc(10);
    memset(data,1,9);

    Queue obj(data,10);

    Queue *pqueue=&obj;

    printf("%d\n",pqueue->getint());


    q.push(pqueue);

}

void consume()
{

    Queue *obj;

    q.pop(&obj);

    printf("%d\n",obj->getint());

}


int main(int argc, char** argv) {


//  std::thread t1{produce};
//  std::thread t2{consume};
//  
//  t1.join();
//  t2.join();
    produce();
    consume();

    return 0;
}

As per boost::lockfree::queue requirements I created following in class.

  • Copy Constructor
  • Assignment Operator
  • Destructor

Please let me know if anything other requires. Thanks.


Solution

    1. You're using malloc in C++.

      You die.

      You have 2 lives left.

      Seriously, don't do that. Especially since using it with delete[] is clear cut Undefined Behaviour.


    2. Sadly you lose another life here:

      Queue obj(data,10);
      
      Queue *pqueue=&obj;
      q.push(pqueue);
      

      You store a pointer to a local. More Undefined Behaviour

      You have 1 life left.


    3. Last life at

      q.pop(&obj);
      

      You pop using an iterator. It will be treated as an output iterator. You get a return that indicates the number of elements popped, and items will be written to &obj[0], &obj[1], &obj[2], etc.

      Guess what? Undefined Behaviour.

      See also: Boost spsc queue segfault

      You died.


    4. You're already dead. But you forsake your afterlife with

      printf("%d\n",obj->getint());
      

      Since pop might not have popped anything (the queue may have been empty), this in itself is Undefined Behaviour.


    The funny part is, you talk about all these constructor requirements but you store pointers in the lockfree queue...?! Just write it:

    typedef std::vector<unsigned char> Data;
    
    class Queue {
        private:
        Data m_data;
    
        public:
        Queue(Data data) : m_data(std::move(data)) {}
        Queue() : m_data() {}
        unsigned char const *getdata() const { return m_data.data(); } 
        size_t               getint()  const { return m_data.size(); } 
    };
    
    boost::lockfree::spsc_queue<Queue> q(10);
    

    Live On Coliru

    Notes:

    • you need to make the consumer check the return code of pop. The push might not have happened, and lock free queues don't block.

    • you don't need that contraption. Just pass vectors all the way:

    C++ Code

    Live On Coliru

    #include <boost/lockfree/spsc_queue.hpp>
    #include <thread>
    #include <iostream>
    #include <vector>
    
    typedef std::vector<unsigned char> Queue;
    
    boost::lockfree::spsc_queue<Queue> q(10);
    
    void produce() {
        Queue obj(10, 1);
    
        std::cout << __FUNCTION__ << " - " << obj.size() << "\n";
    
        q.push(std::move(obj));
    }
    
    void consume() {
        Queue obj;
        while (!q.pop(obj)) { }
    
        std::cout << __FUNCTION__ << " - " << obj.size() << "\n";
    }
    
    int main() {
        std::thread t1 {produce};
        std::thread t2 {consume};
    
        t1.join();
        t2.join();
    }