Search code examples
c++multithreadingboostcircular-buffer

multithreading consumer not producing same output as input using boost ringbuffer


In my main program I am copying a string buffer into a boost ring buffer and then trying to consume that data in a created thread and writing to a file. In the main thread also I am writing the same data to a file but both input and output file is not matching.

I feel I am doing something incredibly stupid. Please help. Also, if there are any suggesting to improve the code that would really be appreciated.

#include <iostream>
#include <vector>
#include <boost/circular_buffer.hpp>
#include <numeric>
#include <assert.h>
#include <thread> 
#include <mutex>
#include <chrono>
#include <time.h>
#include <cstdint>
#include <fstream>
#include <string>

using std::cin;
using std::cout;
using std::endl;
using std::fstream;
using std::string;
#define SOME_FIXED_HARDCODED_NUMBER 40980

class MyClass {
        public:
        std::vector<int8_t> vec;
        public:
        MyClass(std::vector<int8_t> v){ vec = v; }
};

boost::circular_buffer<MyClass> cb(300);
int waiting = 1;
std::mutex my_mutex;
FILE *out_file;
FILE *in_file;

void foo() 
{ 
    while (waiting) {
        std::unique_lock<std::mutex> lock(my_mutex);
        if (!cb.size() || waiting == 0) {
            lock.unlock();
            continue;
        }
        if (!waiting)
            break;
        MyClass local_buf = cb.front();
        cb.pop_front();
        fwrite(local_buf.vec.data(), 1, local_buf.vec.size(), out_file);
    }
} 

int main(int argc, char* argv[])
{
    out_file = fopen("output_data.raw", "w"); 
    in_file = fopen("input_data.raw", "w"); 
    std::thread th1(foo);
    char *buf = {"abc"};
    int counter = 0;

    std::vector<int8_t> mem;
    mem.insert(mem.end(), buf, buf + strlen(buf));
    while (counter < SOME_FIXED_HARDCODED_NUMBER)
    {
        {
            std::unique_lock<std::mutex> lock(my_mutex); 
            /* if the circular buffer is full then wait for consumer to pull the data */
            while (cb.full()) {
                lock.unlock();
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
                std::unique_lock<std::mutex> lock(my_mutex); 
            }
            cb.push_front(MyClass(mem));
            fwrite(mem.data(), 1, mem.size(), in_file);
        }
        counter++;
    }
    waiting = 0;
    th1.join();
    fclose(out_file);
    fclose(in_file);
    return 0;
}

Solution

  •             while (cb.full()) {
                    lock.unlock();
                    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    >>>             std::unique_lock<std::mutex> lock(my_mutex); 
                }
    

    The marked unique_lock doesn't do anything as it will go out of scope immediately and unlock the mutex. Hence once you leave the loop the mutex is not locked and you have a racecondition. Instead, you should use lock.lock() to relock the mutex.

    There is a few more bugs. You are not waiting for your foo thread to actually drain the buffer. It will stop as soon as the waiting flag is set by the main thread. Also, waiting should be an atomic.