I have a set of data that must be processed simultaneously by using multi-threading, the number of data supposedly is bigger than the number of threads. I decided to put the data into some kind of a queue so each free thread can pop its part and process it until the queue is empty. I could use a simple STL queue and lock it by a mutex when I want to dequeue an element out of it, but I'd like to try a lock-free approach. At the same time my project is too small to depend on some 3rd-party library that provides lock-free structures, in fact I need only atomic dequeuing. So I decided to implement my own queue based on a vector with a pointer to the "head" and increment this pointer atomically:
template <typename T>
class AtomicDequeueable
{
public:
// Assumption: data vector never changes
AtomicDequeueable(const std::vector<T>& data) :
m_data(data),
m_pointer(ATOMIC_VAR_INIT(0))
{}
const T * const atomicDequeue()
{
if (std::atomic_load(&m_pointer) < m_data.size())
{
return &m_data
[
std::atomic_fetch_add(&m_pointer, std::size_t(1))
];
}
return nullptr;
}
private:
AtomicDequeueable(const AtomicDequeueable<T>&) {}
std::atomic_size_t m_pointer;
const std::vector<T>& m_data;
};
Threads' function looks as follows:
void f(AtomicDequeueable<Data>& queue)
{
while (auto dataPtr = queue.atomicDequeue())
{
const Data& data = *dataPtr;
// processing data...
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
My experience using lock-free structures and primitives is really poor, so I wonder: will my approach work properly? Surely I have tested it on Ideone, but I don't know how it will behave with real data.
Currently, your atomicDequeue
function has a data race: it is possible for 2 threads to have both executed the first atomic
instruction before executing the second. However, this can be fixed as you really only need 1 atomic operation, as per the following change:
const T * const atomicDequeue()
{
auto myIndex = std::atomic_fetch_add(&m_pointer, std::size_t(1));
if(myIndex >= m_data.size())
return nullptr;
return &m_data[myIndex];
}
This works provided nothing modifies the input vector during thread operation.