Search code examples
c++multithreadingthread-local

Error occurred when using thread_local to maintain a concurrent memory buffer


In the following code, I want to create a memory buffer that allows multiple threads to read/write it concurrently. At a time, all threads will read this buffer in parallel, and later they will write to the buffer in parallel. But there will be no read/write operation at the same time.

To do this, I use a vector of shared_ptr<vector<uint64_t>>. When a new thread arrives, it will be allocated with a new vector<uint64_t> and only write to it. Two threads will not write to the same vector.

I use thread_local to track the vector index and offset the current thread will write to. When I need to add a new buffer to the memory_ variable, I use a mutex to protect it.

class TestBuffer {
public:
    thread_local static uint32_t index_;
    thread_local static uint32_t offset_;
    thread_local static bool ready_;

    vector<shared_ptr<vector<uint64_t>>> memory_;
    mutex lock_;

    void init() {
        if (!ready_) {
            new_slab();
            ready_ = true;
        }
    }

    void new_slab() {
        std::lock_guard<mutex> lock(lock_);
        index_ = memory_.size();
        memory_.push_back(make_shared<vector<uint64_t>>(1000));
        offset_ = 0;
    }

    void put(uint64_t value) {
        init();
        if (offset_ == 1000) {
            new_slab();
        }
        if(memory_[index_] == nullptr) {
            cout << "Error" << endl;
        }
        *(memory_[index_]->data() + offset_) = value;
        offset_++;
    }
};

thread_local uint32_t TestBuffer::index_ = 0;
thread_local uint32_t TestBuffer::offset_ = 0;
thread_local bool TestBuffer::ready_ = false;

int main() {
    TestBuffer buffer;
    vector<std::thread> threads;
    for (int i = 0; i < 10; ++i) {
        thread t = thread([&buffer, i]() {
            for (int j = 0; j < 10000; ++j) {
                buffer.put(i * 10000 + j);
            }
        });
        threads.emplace_back(move(t));
    }
    for (auto &t: threads) {
        t.join();
    }
}

The code does not behave as expected, and reports error is in the put function. The root cause is that memory_[index_] sometimes return nullptr. However, I do not understand why this is possible as I think I have set the values properly. Thanks for the help!


Solution

  • You have a race condition in put caused by new_slab(). When new_slab calls memory_.push_back() the _memory vector may need to resize itself, and if another thread is executing put while the resize is in progress, memory_[index_] might access stale data.

    One solution is to protect the _memory vector by locking the mutex:

    {
        std::lock_guard<mutex> lock(lock_);
    
        if(memory_[index_] == nullptr) {
            cout << "Error" << endl;
        }
        *(memory_[index_]->data() + offset_) = value;
    }
    

    Another is to reserve the space you need in the memory_ vector ahead of time.