Search code examples
c#multithreadingcollectionsqueueconcurrent-queue

Atomically taking everything from a ConcurrentQueue


I have multiple threads generating items and sticking them in a common ConcurrentQueue:

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

I have another single consumer thread but the way it needs to work in the context of this application is, occasionally, it just needs to grab everything currently in the threads' queue, removing it from that queue, all in one shot. Something like:

private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

I think I looked through all the documentation (for the collection and its implemented interfaces) but I didn't seem to find anything like a "concurrently take all objects from queue", or even "concurrently swap contents with another queue".

I could do this no problem if I ditch the ConcurrentQueue and just protect a normal Queue with a lock, like this:

private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

But, I like the convenience of the ConcurrentQueue and also since I'm just learning C# I'm curious about the API; so my question is, is there a way to do this with one of the concurrent collections?

Is there perhaps some way to access whatever synchronization object ConcurrentQueue uses and lock it for myself for my own purposes so that everything plays nicely together? Then I can lock it, take everything, and release?


Solution

  • It depends what you want to do. As per the comments in the source code

    //number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.
    

    This works by internally calling ToList() which in turn works on m_numSnapshotTakers and a spin mechanism

    /// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
    /// cref="T:System.Collections.Generic.List{T}"/>.
    /// </summary>
    /// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
    /// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
    private List<T> ToList()
    {
       // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
       // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
       // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
       Interlocked.Increment(ref m_numSnapshotTakers);
    
       List<T> list = new List<T>();
       try
       {
           //store head and tail positions in buffer, 
           Segment head, tail;
           int headLow, tailHigh;
           GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
    
           if (head == tail)
           {
               head.AddToList(list, headLow, tailHigh);
           }
           else
           {
               head.AddToList(list, headLow, SEGMENT_SIZE - 1);
               Segment curr = head.Next;
               while (curr != tail)
               {
                   curr.AddToList(list, 0, SEGMENT_SIZE - 1);
                   curr = curr.Next;
               }
               //Add tail segment
               tail.AddToList(list, 0, tailHigh);
           }
       }
       finally
       {
           // This Decrement must happen after copying is over. 
           Interlocked.Decrement(ref m_numSnapshotTakers);
       }
       return list;
    }
    

    If a snapshot is all you want, then you are in luck. However, there is seemingly no built in way to get and remove all the items from a ConcurrentQueue in a thread safe manner. You will need to bake your own synchronisation by using lock or similar. Or roll your own (which might not be all that difficult looking at the source).