Search code examples
c++multithreadingc++11boostproducer-consumer

C++ blocking queue with Boost


I've come up with the following blocking queue implementation, with std::vector as the container for the elements stored in the queue and using Boost for threading/synchronization. I also referred to a similar post here.

template<typename T>
class BlockingQueue
{
public:
  explicit BlockingQueue(const std::vector<T>& buf): 
    buffer(buf)
  {}
  explicit BlockingQueue(): buffer()
  {}
  void push(const T& elem);
  T pop();
  ~BlockingQueue()
  {}

private:
  boost::mutex mutex;                             // mutex variable
  boost::condition_variable_any notEmptyCond;     // condition variable, to check whether the queue is empty
  std::vector<T> buffer;
};

template<typename T>
void BlockingQueue<T>::push(const T& elem) 
{
  boost::mutex::scoped_lock lock(mutex);
  buffer.push_back(elem);
  notEmptyCond.notify_one();                      // notifies one of the waiting threads which are blocked on the queue  
  // assert(!buffer.empty());
}

template<typename T>
T BlockingQueue<T>::pop()
{
  boost::mutex::scoped_lock lock(mutex);
  notEmptyCond.wait(lock,[&](){ return (buffer.size() > 0); });   // waits for the queue to get filled and for a notification, to resume consuming
  T elem = buffer.front();
  buffer.erase(buffer.begin());
  return elem;
}

I've two threads (producer/consumer), one reading strings from a file and populating them into the BlockingQueue, the other to remove the strings from the BlockingQueue and print them. Both of these are initialized from a class whose definition is given below.

class FileProcessor
{
public:
  explicit FileProcessor():bqueue(),inFile("random.txt")
  {
    rt = boost::thread(boost::bind(&FileVerifier::read, this));
    pt1 = boost::thread(boost::bind(&FileVerifier::process, this));
  }

  volatile ~FileProcessor()
  {
    rt.interrupt(); 
    pt1.interrupt(); 
    rt.join(); 
    pt1.join(); 
  }

  /* Read strings from a file, populate them in the blocking-queue */
  void read()
  {
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
    boost::iostreams::filtering_istream in;
    if (file.fail()) {
      std::cout << "couldn't open the input file.. please check its name and read permissions\n";
      return;
    }
    try {
      in.push(file);                      
      for(std::string inputStr; std::getline(in,inputStr);) 
      {
        bqueue.push(inputStr);
        std::cout << "inserted " << inputStr << "\n";
      }
    }
    catch(std::exception& e) {
      std::cout << "exception occurred while reading file\n" << e.what() << "\n";
    }
  }

  /* Process the elements (dequeue and print) */
  void process()
  {
    while (true)
    {
      std::string rstr = bqueue.pop();
      std::cout << "consumed " << rstr << "\n";
    }
  }

private:
  boost::mutex mutex;
  boost::thread rt;
  boost::thread pt1;
  BlockingQueue<std::string> bqueue;
  std::string inFile;     // input file name from where the strings are read
};

I observe the following output (only a snapshot included):

Run 1:

inserted AZ
inserted yezjAdCeV
inserted icKU
inserted q
inserted b
inserted DRQL
inserted aaOj
inserted CqlNRv
inserted e
inserted XuDemby
inserted rE
inserted YPk
inserted dLd
inserted xb
inserted bSrZdf
inserted sCQiRna
...

Run 4:

consumed jfRnjSxrw
inserted INdmXSCr
consumed oIDlu
inserted FfXdARGu
consumed tAO
inserted mBq
consumed I
inserted aoXNhP
consumed OOAf
inserted Qoi
consumed wCxJXGWJu
inserted WZGYHluTV
consumed oIFOh
inserted kkIoFF
consumed ecAYyjHh
inserted C
consumed KdrBIixw
inserted Ldeyjtxe
...

My problem : The consumer thread is sometimes given control over the queue's resource (able to dequeue and print) and sometimes it is not. I'm not sure why this happens. Any hints on the queue's design-flaws would be greatly appreciated. Thanks!

Observations:

  1. When the threads aren't initialized from the (FileProcessor) class' ctor, they behave as expected i.e. they access the BlockingQueue resource and do their read/write operations. Please refer to the snippets below for the changes made to have this behavior.

  2. The producer-consumer threads don't take alternative turns, as @n.m noted the producer doesn't yield to the consumer explicitly. Following the above observation, their respective outputs were something like the one given below

    inserted DZxcOw
    consumed inserted DZxcOw
    consumed robECjOp
    robECjOp
    inserted BaILFsVaA
    inserted HomURR
    inserted PVjLPb
    consumed BaILFsVaA
    consumed HomURR
    consumed PVjLPb
    inserted SHdBVSEyU
    consumed SHdBVSEyU
    consumed JaEH
    inserted JaEH
    inserted g
    inserted MwEgOVB
    inserted qlohoszv
    consumed g
    consumed MwEgOVB
    consumed qlohoszv
    consumed AsQgq
    inserted AsQgq
    inserted tbm
    inserted iriADeEL
    inserted Zoxs
    consumed tbm
    

Initializing from outside a class ctor.

#include <iostream>
#include <threading/file_processor.h>  //has the FileProcessor class declaration

int main()
{
  FileProcessor fp;  //previously, I had only this statement which called the class constructor, from where the threads were initialized.
  boost::thread rt(boost::bind(&FileProcessor::read, &fp));
  boost::thread pt1(boost::bind(&FileProcessor::process, &fp));
  rt.join();
  pt1.join();
  return 0;
}

Modified FileProcessor class (removed the thread-initialization from its ctor)

#include <boost/iostreams/filtering_stream.hpp>
#include <threading/blocking_queue.h>  //has the BlockingQueue class

using namespace boost::iostreams;

   class FileProcessor
   {
    public:
      explicit FileProcessor():bqueue(),inFile("random.txt")
      {}

  ~FileProcessor()
  {}

  void read()
  {
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
    filtering_istream in;
    if (file.fail()) {
      std::cout << "couldn't open the input file.. please check its name and read permissions\n";
      return;
    }
    try {
      in.push(file);
      for(std::string inputStr; std::getline(in,inputStr);) 
      {
        bqueue.push(inputStr);
        std::cout << "inserted " << inputStr << "\n";
      }
    }
    catch(std::exception& e) {
      std::cout << "exception occurred while reading file\n" << e.what() << "\n";
    }
  }

  void process()
  {
    while (true)
    {
      std::string rstr = bqueue.pop();
      std::cout << "consumed " << rstr << "\n";
    }
  }

private:
  BlockingQueue<std::string> bqueue;
  std::string inFile;     // input file name from where the strings are read
   };

Edits:

24 May 2017 : Removed the inaccurate comment "gets the whole file content into a buffer".


Solution

  • Indeed there's no design flaw, just flawed expectations of how threads are scheduled by the OS.

    Here's a version that adds a maximum queue "depth" (capacity) and makes push block if the queue reached capacity. The demo then uses a capacity of 1 to show perfect turn-by-turn consumption (of course this is suboptimal, performance-wise).

    I've replaced the _any conditions with regular ones, since you can. I've momentarily dropped iostreams use (note that the comment // gets the whole file content into an input stream buffer is completely inaccurate anyway).

    Live On Coliru

    #include <boost/thread.hpp>
    #include <deque>
    #include <fstream>
    
    #include <iostream>
    static boost::mutex s_iomutex;
    
    template <typename T> class BlockingQueue {
      public:
        explicit BlockingQueue(size_t capacity) : _buffer(), _capacity(capacity) {
            assert(capacity>0);
        }
    
        void push(const T &elem) {
            boost::unique_lock<boost::mutex> lock(_mutex);
            _pop_event.wait(lock, [&] { return _buffer.size() < _capacity; });
            _buffer.push_back(elem);
            _push_event.notify_one(); // notifies one of the waiting threads which are blocked on the queue
            // assert(!_buffer.empty());
        }
    
        T pop() {
            boost::unique_lock<boost::mutex> lock(_mutex);
            _push_event.wait(lock, [&] { return _buffer.size() > 0; });
    
            T elem = _buffer.front();
            _buffer.pop_front();
            _pop_event.notify_one();
            return elem;
        }
    
      private:
        boost::mutex _mutex;
        boost::condition_variable _push_event, _pop_event;
        std::deque<T> _buffer;
        size_t _capacity;
    };
    
    class FileProcessor {
      public:
        explicit FileProcessor(size_t capacity = 10) : bqueue(capacity), inFile("random.txt") {}
    
        /* Read strings from a file, populate them in the blocking-queue */
        void read() {
            try {
                std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
                for (std::string inputStr; std::getline(file, inputStr);) {
                    bqueue.push(inputStr);
    
                    boost::lock_guard<boost::mutex> lock(s_iomutex);
                    std::cout << "inserted " << inputStr << "\n";
                }
            } catch (std::exception &e) {
                std::cerr << "exception occurred while reading file\n" << e.what() << "\n";
            }
        }
    
        /* Process the elements (dequeue and print) */
        void process() {
            while (true) {
                std::string rstr = bqueue.pop();
                boost::lock_guard<boost::mutex> lock(s_iomutex);
                std::cout << "consumed " << rstr << "\n";
            }
        }
    
      private:
        BlockingQueue<std::string> bqueue;
        std::string inFile; // input file name from where the strings are read
    };
    
    int main() {
        FileProcessor fp(1);
        boost::thread rt(boost::bind(&FileProcessor::read, &fp));
        boost::thread pt1(boost::bind(&FileProcessor::process, &fp));
        rt.join();
    
        pt1.interrupt();
        pt1.join();
    }
    

    Prints

    inserted 1 15786
    inserted 2 2099
    consumed 1 15786
    consumed 2 2099
    inserted 3 23963
    consumed 3 23963
    inserted 4 6928
    consumed 4 6928
    inserted 5 16279
    consumed 5 16279
    inserted 6 26787
    consumed 6 26787
    inserted 7 13463
    consumed 7 13463
    inserted 8 14099
    consumed 8 14099
    inserted 9 21808
    consumed 9 21808
    inserted 10 22618
    consumed 10 22618
    inserted 11 10618
    consumed 11 10618
    inserted 12 8211
    consumed 12 8211
    inserted 13 32033
    consumed 13 32033
    inserted 14 14512
    consumed 14 14512
    inserted 15 17734
    consumed 15 17734
    inserted 16 3632
    consumed 16 3632
    inserted 17 8265
    consumed 17 8265
    inserted 18 17922
    consumed 18 17922
    inserted 19 15753
    consumed 19 15753
    inserted 20 7474
    consumed 20 7474
    inserted 21 20136
    consumed 21 20136
    inserted 22 12334
    consumed 22 12334
    inserted 23 23299
    consumed 23 23299
    inserted 24 4066
    consumed 24 4066
    inserted 25 5173
    consumed 25 5173
    inserted 26 17640
    consumed 26 17640
    inserted 27 19218
    consumed 27 19218
    inserted 28 26387
    consumed 28 26387
    inserted 29 26357
    consumed 29 26357
    inserted 30 15206
    consumed 30 15206
    inserted 31 28714
    consumed 31 28714
    inserted 32 32648
    consumed 32 32648
    inserted 33 1500
    consumed 33 1500
    inserted 34 20941
    consumed 34 20941
    inserted 35 3838
    consumed 35 3838
    inserted 36 29680
    consumed 36 29680
    inserted 37 24626
    consumed 37 24626
    inserted 38 14824
    consumed 38 14824
    inserted 39 19690
    consumed 39 19690
    inserted 40 27815
    consumed 40 27815
    inserted 41 6760
    consumed 41 6760
    inserted 42 21322
    consumed 42 21322
    inserted 43 17966
    consumed 43 17966
    inserted 44 15292
    consumed 44 15292
    inserted 45 23321
    consumed 45 23321
    inserted 46 7437
    consumed 46 7437
    inserted 47 5444
    consumed 47 5444
    inserted 48 26785
    consumed 48 26785
    inserted 49 22430
    consumed 49 22430
    inserted 50 25417
    consumed 50 25417
    inserted 51 10408
    consumed 51 10408
    inserted 52 32096
    consumed 52 32096
    inserted 53 489
    consumed 53 489
    inserted 54 7083
    consumed 54 7083
    inserted 55 21555
    consumed 55 21555
    inserted 56 3759
    consumed 56 3759
    inserted 57 20811
    consumed 57 20811
    inserted 58 20176
    consumed 58 20176
    inserted 59 31305
    consumed 59 31305
    inserted 60 9894
    consumed 60 9894
    inserted 61 5515
    consumed 61 5515
    inserted 62 9978
    consumed 62 9978
    inserted 63 1981
    consumed 63 1981
    inserted 64 22286
    consumed 64 22286
    inserted 65 11081
    consumed 65 11081
    inserted 66 4392
    consumed 66 4392
    inserted 67 2252
    consumed 67 2252
    inserted 68 16714
    consumed 68 16714
    inserted 69 16003
    consumed 69 16003
    inserted 70 16695
    consumed 70 16695
    inserted 71 11288
    consumed 71 11288
    inserted 72 4788
    consumed 72 4788
    inserted 73 14454
    consumed 73 14454
    inserted 74 29920
    consumed 74 29920
    inserted 75 25154
    consumed 75 25154
    inserted 76 6206
    consumed 76 6206
    inserted 77 14444
    consumed 77 14444
    inserted 78 2921
    consumed 78 2921
    inserted 79 26908
    consumed 79 26908
    inserted 80 24148
    consumed 80 24148
    inserted 81 8487
    consumed 81 8487
    inserted 82 11371
    consumed 82 11371
    inserted 83 31047
    consumed 83 31047
    inserted 84 27749
    consumed 84 27749
    inserted 85 13548
    consumed 85 13548
    inserted 86 13807
    consumed 86 13807
    inserted 87 9411
    consumed 87 9411
    inserted 88 21999
    consumed 88 21999
    inserted 89 24386
    consumed 89 24386
    inserted 90 10190
    consumed 90 10190
    inserted 91 2472
    consumed 91 2472
    inserted 92 17149
    consumed 92 17149
    inserted 93 14288
    consumed 93 14288
    inserted 94 31625
    consumed 94 31625
    inserted 95 4732
    consumed 95 4732
    inserted 96 20273
    consumed 96 20273
    inserted 97 29036
    consumed 97 29036
    inserted 98 4425
    consumed 98 4425
    inserted 99 1563
    consumed 99 1563
    inserted 100 2796
    consumed 100 2796
    inserted 101 24374
    consumed 101 24374
    inserted 102 8151
    consumed 102 8151
    inserted 103 31361
    consumed 103 31361
    inserted 104 22466
    consumed 104 22466
    inserted 105 23365
    consumed 105 23365
    inserted 106 23762
    consumed 106 23762
    inserted 107 3616
    consumed 107 3616
    inserted 108 7711
    consumed 108 7711
    inserted 109 23178
    consumed 109 23178
    inserted 110 18791
    consumed 110 18791
    inserted 111 13371
    consumed 111 13371
    inserted 112 14553
    consumed 112 14553
    inserted 113 32026
    consumed 113 32026
    inserted 114 4567
    consumed 114 4567
    inserted 115 22178
    consumed 115 22178
    inserted 116 23947
    inserted 117 5928
    consumed 116 23947
    consumed 117 5928
    inserted 118 25606
    consumed 118 25606
    inserted 119 5141
    consumed 119 5141
    inserted 120 17681
    consumed 120 17681
    inserted 121 8024
    consumed 121 8024
    inserted 122 9094
    consumed 122 9094
    inserted 123 24878
    consumed 123 24878
    inserted 124 27800
    consumed 124 27800
    inserted 125 10225
    consumed 125 10225
    inserted 126 1157
    consumed 126 1157
    inserted 127 28217
    consumed 127 28217
    inserted 128 15144
    consumed 128 15144
    inserted 129 25692
    consumed 129 25692
    inserted 130 250
    consumed 130 250
    inserted 131 17432
    consumed 131 17432
    inserted 132 10055
    consumed 132 10055
    inserted 133 24279
    consumed 133 24279
    inserted 134 9445
    consumed 134 9445
    inserted 135 4149
    consumed 135 4149
    inserted 136 23240
    consumed 136 23240
    inserted 137 23146
    consumed 137 23146
    inserted 138 8576
    consumed 138 8576
    inserted 139 11469
    consumed 139 11469
    inserted 140 27250
    consumed 140 27250
    inserted 141 12203
    consumed 141 12203
    inserted 142 21730
    consumed 142 21730
    inserted 143 30824
    consumed 143 30824
    inserted 144 11197
    consumed 144 11197
    inserted 145 11076
    consumed 145 11076
    inserted 146 6960
    consumed 146 6960
    inserted 147 7313
    consumed 147 7313
    inserted 148 16701
    consumed 148 16701
    inserted 149 21044
    consumed 149 21044
    inserted 150 9934
    consumed 150 9934
    inserted 151 18562
    consumed 151 18562
    inserted 152 3559
    consumed 152 3559
    inserted 153 5541
    consumed 153 5541
    inserted 154 16024
    consumed 154 16024
    inserted 155 9877
    consumed 155 9877
    inserted 156 18443
    consumed 156 18443
    inserted 157 6312
    consumed 157 6312
    inserted 158 24237
    consumed 158 24237
    inserted 159 27685
    consumed 159 27685
    inserted 160 6154
    consumed 160 6154
    inserted 161 32723
    consumed 161 32723
    inserted 162 8358
    consumed 162 8358
    inserted 163 5518
    consumed 163 5518
    inserted 164 15857
    consumed 164 15857
    inserted 165 26383
    consumed 165 26383
    inserted 166 13179
    consumed 166 13179
    inserted 167 29919
    consumed 167 29919
    inserted 168 5135
    consumed 168 5135
    inserted 169 7147
    consumed 169 7147
    inserted 170 4383
    consumed 170 4383
    inserted 171 13147
    consumed 171 13147
    inserted 172 15658
    consumed 172 15658
    inserted 173 18478
    consumed 173 18478
    inserted 174 29793
    consumed 174 29793
    inserted 175 16003
    consumed 175 16003
    inserted 176 12804
    consumed 176 12804
    inserted 177 25713
    consumed 177 25713
    inserted 178 28108
    consumed 178 28108
    inserted 179 8518
    consumed 179 8518
    inserted 180 9874
    consumed 180 9874
    inserted 181 30731
    consumed 181 30731
    inserted 182 15582
    consumed 182 15582
    inserted 183 12589
    consumed 183 12589
    inserted 184 15839
    consumed 184 15839
    inserted 185 19505
    consumed 185 19505
    inserted 186 20543
    consumed 186 20543
    inserted 187 6331
    consumed 187 6331
    inserted 188 25289
    consumed 188 25289
    inserted 189 14877
    consumed 189 14877
    inserted 190 25571
    consumed 190 25571
    inserted 191 10873
    consumed 191 10873
    inserted 192 13568
    consumed 192 13568
    inserted 193 16319
    consumed 193 16319
    inserted 194 28590
    consumed 194 28590
    inserted 195 22303
    consumed 195 22303
    inserted 196 20685
    consumed 196 20685
    inserted 197 1528
    consumed 197 1528
    inserted 198 5200
    consumed 198 5200
    inserted 199 25689
    consumed 199 25689
    inserted 200 25140
    consumed 200 25140