I implemented a lock-free queue based on the algorithm specified in Maged M. Michael and Michael L. Scott work Simple, Fast, and Practical Non-Blocking and Blocking
Concurrent Queue Algorithms (for the algorithm, jump to page 4)
I used atomic operation on shared_ptr
such as std::atomic_load_explicit
etc.
when using the queue in one thread only, everything is fine, but when using it from different thread, I get a stack overflow exception.
I could not trace the source of the problem unfortunately. it seems that when one shared_ptr
is getting out of scope, it decrements the number of references on the next ConcurrentQueueNode
and it causes an infinite recursion, but I can't see why..
the Code:
the Queue node:
template<class T>
struct ConcurrentQueueNode {
T m_Data;
std::shared_ptr<ConcurrentQueueNode> m_Next;
template<class ... Args>
ConcurrentQueueNode(Args&& ... args) :
m_Data(std::forward<Args>(args)...) {}
std::shared_ptr<ConcurrentQueueNode>& getNext() {
return m_Next;
}
T getValue() {
return std::move(m_Data);
}
};
The concurrent queue (note: not for the faint hearted):
template<class T>
class ConcurrentQueue {
std::shared_ptr<ConcurrentQueueNode<T>> m_Head, m_Tail;
public:
ConcurrentQueue(){
m_Head = m_Tail = std::make_shared<ConcurrentQueueNode<T>>();
}
template<class ... Args>
void push(Args&& ... args) {
auto node = std::make_shared<ConcurrentQueueNode<T>>(std::forward<Args>(args)...);
std::shared_ptr<ConcurrentQueueNode<T>> tail;
for (;;) {
tail = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire);
std::shared_ptr<ConcurrentQueueNode<T>> next =
std::atomic_load_explicit(&tail->getNext(),std::memory_order_acquire);
if (tail == std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)) {
if (next.get() == nullptr) {
auto currentNext = std::atomic_load_explicit(&m_Tail, std::memory_order_acquire)->getNext();
auto res = std::atomic_compare_exchange_weak(&tail->getNext(), &next, node);
if (res) {
break;
}
}
else {
std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
}
}
}
std::atomic_compare_exchange_strong(&m_Tail, &tail, node);
}
bool tryPop(T& dest) {
std::shared_ptr<ConcurrentQueueNode<T>> head;
for (;;) {
head = std::atomic_load_explicit(&m_Head, std::memory_order_acquire);
auto tail = std::atomic_load_explicit(&m_Tail,std::memory_order_acquire);
auto next = std::atomic_load_explicit(&head->getNext(), std::memory_order_acquire);
if (head == std::atomic_load_explicit(&m_Head, std::memory_order_acquire)) {
if (head.get() == tail.get()) {
if (next.get() == nullptr) {
return false;
}
std::atomic_compare_exchange_weak(&m_Tail, &tail, next);
}
else {
dest = next->getValue();
auto res = std::atomic_compare_exchange_weak(&m_Head, &head, next);
if (res) {
break;
}
}
}
}
return true;
}
};
example usage that reproduces the problem :
int main(){
ConcurrentQueue<int> queue;
std::thread threads[4];
for (auto& thread : threads) {
thread = std::thread([&queue] {
for (auto i = 0; i < 100'000; i++) {
queue.push(i);
int y;
queue.tryPop(y);
}
});
}
for (auto& thread : threads) {
thread.join();
}
return 0;
}
The problem is the race condition that can lead to every node in the queue waiting to be freed all at once - which is recursive and blows your stack.
If you change your test to use only one thread but don't pop, you get the same stack overflow error every time.
for (auto i = 1; i < 100000; i++) {
queue.push(i);
//int y;
//queue.tryPop(y);
}
You need to unrecursive-ize deleting the chain of nodes:
__forceinline ~ConcurrentQueueNode() {
if (!m_Next || m_Next.use_count() > 1)
return;
KillChainOfDeath();
}
void KillChainOfDeath() {
auto pThis = this;
std::shared_ptr<ConcurrentQueueNode> Next, Prev;
while (1) {
if (pThis->m_Next.use_count() > 1)
break;
Next.swap(pThis->m_Next); // unwire node
Prev = NULL; // free previous node that we unwired in previous loop
if (!(pThis = Next.get())) // move to next node
break;
Prev.swap(Next); // else Next.swap will free before unwire.
}
}
I have never used shared_ptr before, so I don't know if there is a faster way to do this. Also, since I have never used shared_ptr before, i don't know if your algorithm will suffer from ABA issues. Unless there is something special in the shared_ptr implementation to prevent ABA I worry that previously freed nodes could be reused, spoofing CAS. I never seemed to have that problem though when I ran your code.