Search code examples
goatomicnonblocking

Implementing a lock-free unbounded queue with new atomic.Pointer types


I am trying to implement this non-blocking queue from Michael and Scott.

I am trying to use the new atomic.Pointer types introduced in Go 1.19 but I am getting a data race in my application.

Here is my implementation:

package queue

import (
    "errors"
    "sync/atomic"
)

// LockfreeQueue represents a FIFO structure with operations to enqueue
// and dequeue generic values.
// Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
type LockFreeQueue[T any] struct {
    head atomic.Pointer[node[T]]
    tail atomic.Pointer[node[T]]
}

// node represents a node in the queue
type node[T any] struct {
    value T
    next  atomic.Pointer[node[T]]
}

// newNode creates and initializes a node
func newNode[T any](v T) *node[T] {
    return &node[T]{value: v}
}

// NewQueue creates and initializes a LockFreeQueue
func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
    var head atomic.Pointer[node[T]]
    var tail atomic.Pointer[node[T]]
    n := &node[T]{}
    head.Store(n)
    tail.Store(n)
    return &LockFreeQueue[T]{
        head: head,
        tail: tail,
    }
}

// Enqueue adds a series of Request to the queue
func (q *LockFreeQueue[T]) Enqueue(v T) {
    n := newNode(v)
    for {
        tail := q.tail.Load()
        next := tail.next.Load()
        if tail == q.tail.Load() {
            if next == nil {
                if tail.next.CompareAndSwap(next, n) {
                    q.tail.CompareAndSwap(tail, n)
                    return
                }
            } else {
                q.tail.CompareAndSwap(tail, next)
            }
        }
    }
}

// Dequeue removes a Request from the queue
func (q *LockFreeQueue[T]) Dequeue() (T, error) {
    for {
        head := q.head.Load()
        tail := q.tail.Load()
        next := head.next.Load()
        if head == q.head.Load() {
            if head == tail {
                if next == nil {
                    return head.value, errors.New("queue is empty")
                }
                q.tail.CompareAndSwap(tail, next)
            } else {
                v := next.value
                if q.head.CompareAndSwap(head, next) {
                    return v, nil
                }
            }
        }
    }
}

// Check if the queue is empty.
func (q *LockFreeQueue[T]) IsEmpty() bool {
        return q.head.Load() == q.tail.Load()
}

I found a different implementation here which works in my application without a data race, but I can't seem to figure out what is exactly different between the two.

Any help or feedback is appreciated!


Solution

  • It turned out that changing some things fixed the issue.

    First change:

    var n = node[T]{}
    head.Store(&n)
    tail.Store(&n)
    

    Second change was changing the Dequeue() return signature.

    The final file looks like this:

    package queue
    
    import (
        "sync/atomic"
    )
    
    // LockfreeQueue represents a FIFO structure with operations to enqueue
    // and dequeue generic values.
    // Reference: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
    type LockFreeQueue[T any] struct {
        head atomic.Pointer[node[T]]
        tail atomic.Pointer[node[T]]
    }
    
    // node represents a node in the queue
    type node[T any] struct {
        value T
        next  atomic.Pointer[node[T]]
    }
    
    // newNode creates and initializes a node
    func newNode[T any](v T) *node[T] {
        return &node[T]{value: v}
    }
    
    // NewQueue creates and initializes a LockFreeQueue
    func NewLockFreeQueue[T any]() *LockFreeQueue[T] {
        var head atomic.Pointer[node[T]]
        var tail atomic.Pointer[node[T]]
        var n = node[T]{}
        head.Store(&n)
        tail.Store(&n)
        return &LockFreeQueue[T]{
            head: head,
            tail: tail,
        }
    }
    
    // Enqueue adds a series of Request to the queue
    func (q *LockFreeQueue[T]) Enqueue(v T) {
        n := newNode(v)
        for {
            tail := q.tail.Load()
            next := tail.next.Load()
            if tail == q.tail.Load() {
                if next == nil {
                    if tail.next.CompareAndSwap(next, n) {
                        q.tail.CompareAndSwap(tail, n)
                        return
                    }
                } else {
                    q.tail.CompareAndSwap(tail, next)
                }
            }
        }
    }
    
    // Dequeue removes a Request from the queue
    func (q *LockFreeQueue[T]) Dequeue() T {
        var t T
        for {
            head := q.head.Load()
            tail := q.tail.Load()
            next := head.next.Load()
            if head == q.head.Load() {
                if head == tail {
                    if next == nil {
                        return t
                    }
                    q.tail.CompareAndSwap(tail, next)
                } else {
                    v := next.value
                    if q.head.CompareAndSwap(head, next) {
                        return v
                    }
                }
            }
        }
    }
    
    // Check if the queue is empty.
    func (q *LockFreeQueue[T]) IsEmpty() bool {
        return q.head.Load() == q.tail.Load()
    }