In the following code, I want to create a memory buffer that allows multiple threads to read/write it concurrently. At a time, all threads will read this buffer in parallel, and later they will write to the buffer in parallel. But there will be no read/write operation at the same time.
To do this, I use a vector
of shared_ptr<vector<uint64_t>>
. When a new thread arrives, it will be allocated with a new vector<uint64_t>
and only write to it. Two threads will not write to the same vector.
I use thread_local to track the vector index and offset the current thread will write to. When I need to add a new buffer to the memory_
variable, I use a mutex to protect it.
class TestBuffer {
public:
thread_local static uint32_t index_;
thread_local static uint32_t offset_;
thread_local static bool ready_;
vector<shared_ptr<vector<uint64_t>>> memory_;
mutex lock_;
void init() {
if (!ready_) {
new_slab();
ready_ = true;
}
}
void new_slab() {
std::lock_guard<mutex> lock(lock_);
index_ = memory_.size();
memory_.push_back(make_shared<vector<uint64_t>>(1000));
offset_ = 0;
}
void put(uint64_t value) {
init();
if (offset_ == 1000) {
new_slab();
}
if(memory_[index_] == nullptr) {
cout << "Error" << endl;
}
*(memory_[index_]->data() + offset_) = value;
offset_++;
}
};
thread_local uint32_t TestBuffer::index_ = 0;
thread_local uint32_t TestBuffer::offset_ = 0;
thread_local bool TestBuffer::ready_ = false;
int main() {
TestBuffer buffer;
vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
thread t = thread([&buffer, i]() {
for (int j = 0; j < 10000; ++j) {
buffer.put(i * 10000 + j);
}
});
threads.emplace_back(move(t));
}
for (auto &t: threads) {
t.join();
}
}
The code does not behave as expected, and reports error is in the put
function. The root cause is that memory_[index_]
sometimes return nullptr
. However, I do not understand why this is possible as I think I have set the values properly. Thanks for the help!
You have a race condition in put
caused by new_slab()
. When new_slab
calls memory_.push_back()
the _memory
vector may need to resize itself, and if another thread is executing put
while the resize is in progress, memory_[index_]
might access stale data.
One solution is to protect the _memory
vector by locking the mutex:
{
std::lock_guard<mutex> lock(lock_);
if(memory_[index_] == nullptr) {
cout << "Error" << endl;
}
*(memory_[index_]->data() + offset_) = value;
}
Another is to reserve the space you need in the memory_
vector ahead of time.