I'm aware of blocking-queues and lock-free queues, a great example of those implementations being provided by Scott et al., but are there any implementations of a lock-free-blocking-queue?
In a lock-free-blocking-queue the dequeue will require no locking, but if there are no items in the queue it will block the consumer. Are there any implementations of such a beast? I prefer if they're C# implementations, but any implementation would technically work.
Update:
I think I end up with a race condition on line D14.1:
initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event
enqueue(Q: pointer to queue t, value: data type)
E1: node = new node() // Allocate a new node from the free list
E2: node–>value = value // Copy enqueued value into node
E3: node–>next.ptr = NULL // Set next pointer of node to NULL
E4: loop // Keep trying until Enqueue is done
E5: tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6: next = tail.ptr–>next // Read next ptr and count fields together
E7: if tail == Q–>Tail // Are tail and next consistent?
E8: if next.ptr == NULL // Was Tail pointing to the last node?
E9: if CAS(&tail.ptr–>next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1: signal.Set() // Signal to the blocking dequeues
E10.2: break // Enqueue is done. Exit loop
E11: endif
E12: else // Tail was not pointing to the last node
E13: CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14: endif
E15: endif
E16: endloop
E17: CAS(&Q–>Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node
dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1: loop // Keep trying until Dequeue is done
D2: head = Q–>Head // Read Head
D3: tail = Q–>Tail // Read Tail
D4: next = head–>next // Read Head.ptr–>next
D5: if head == Q–>Head // Are head, tail, and next consistent?
D6: if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7: if next.ptr == NULL // Is queue empty?
D8.1: signal.WaitOne() // Block until an enqueue
D8.X: // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9: endif
D10: CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11: else // No need to deal with Tail
// Read value before CAS, otherwise another dequeue might free the next node
D12: *pvalue = next.ptr–>value
D13: if CAS(&Q–>Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1: if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2: signal.Reset()
D14.3: break // Dequeue is done. Exit loop
D15: endif
D16: endif
D17: endif
D18: endloop
D19: free(head.ptr) // It is safe now to free the old dummy node
D20: return TRUE // Queue was not empty, dequeue succeeded
EDIT:
SIMPLER: I suggest you don't need a head and tail for your queue. Just have a head. If the head = NULL, the list is empty. Add items to head. Remove items from head. Simpler, fewer CAS ops.
HELPER: I suggested in the comments that you need to think of a helper scheme to handle the race. In my version of what "lock free" means, it's ok to have rare race conditions if they don't cause problems. I like the extra performance vs having an idle thread sleep a couple ms too long.
Helper ideas. When a consumer grabs work it could check to see if there is a thread in a coma. When a producer adds work, it could look for threads in comas.
So track sleepers. Use a linked list of sleepers. When a thread decides there is no work, it marks itself as !awake and CAS's itself to head of the sleeper list. When a signal is received to wake up, the thread marks self as awake. Then the newly awakened thread cleans up the sleeper list. To clean up a concurrent single linked list, you have to be careful. You can only CAS to the head. So while the head of the sleeper list is marked awake, you can CAS the head off. If the head is not awake, continue to scan the list and "lazy unlink" (I made that term up) the remaining awake items. Lazy unlink is simple...just set next ptr of prev item over the awake item. A concurrent scan will still make it to the end of the list even if it gets to items that are !awake. Subsequent scans see a shorter list. Finally, any time you add work or pull off work, scan the sleeper list for !awake items. If a consumer notices work remains after grabbing some work (.next work != NULL), the consumer can scan sleeper list and signal the first thread that is !awake. After a producer adds work, the producer can scan the sleeper list and do the same.
If you have a broadcast scenario and cant signal a single thread, then just keep a count of asleep threads. While that count is still > 0, a consumer noticing remaining work and a consumer adding work would broadcast the signal to wake up.
In our environment, we have 1 thread per SMT, so the sleeper list can never be that large (well unless I get my hands on one of those new 128 concurrent thread machines!) We generate work items early in a transaction. In the first sec we might generate 10,000 work items, and this production rapidly tapers off. Threads work for a couple sec on those work items. So, we rarely have a thread on the idle pool.
YOU CAN STILL USE LOCKS If you have 1 thread only and generate work rarely...this wont work for you. In that case the performance of mutexes is of no concern and you should just use them. Use a lock on the sleeper queue in this scenario. Think of lock-free as being "no locks where it counts".
PREVIOUS POST: Are you saying: There is a queue of work. There are many consumer threads. A consumer needs to pull of work and do it if there is any work A consumer thread needs to sleep until there is work.
If you are, we do this using only atomic operations this way:
The queue of work is a linked list. There is also a linked list of sleeping threads.
To add work: CAS the head of the list to the new work. When work is added,we check to see if there are any threads on the sleeper list. If there are, before adding the work, we CAS a sleeper off the sleeper list, set its work = the new work, and then signal the sleeper to wake up. The we add the work to the work queue.
To consume work: CAS the head of the list to head->next. If the head of the work list is NULL, we CAS the thread to a list of sleepers.
Once a thread has a work item, the thread must CAS the work item's state to WORK_INPROGRESS or some such. If that fails, it means the work is being performed by another, so the consumer thread goes back to search for work. If a thread wakes up and has a work item, it still has to CAS the state.
So if work is added, a sleeping consumer is always woken up and handed the work. pthread_kill() always wakes a thread at sigwait(), because even If the thread gets to sigwait after the signal, the signal is received. This solves the problem of a thread putting itself on the sleeper list but getting signaled before going to sleep. All that happens is the thread tries to own its ->work if there is one. Failure to own work or not having work sends the thread back to consume-start. If a thread fails to CAS to the sleeper list, it means that either another thread beat it, or that the producer pulled off a sleeper. For safety, we have the thread act as if it were just woken up.
We get no race conditions doing this and have multiple producers and consumers. We also have been able to expand this to allow threads to sleep on individual work items as well.