Search code examples
c#multithreadingcollectionsconcurrencyconcurrent-queue

Working with data held in a ConcurrentQueue<T>


I have a background worker that streams data and saves it to a ConcurrentQueue<T> which is what I need since it is a thread safe First In First Out collection, but I also need to do tasks like perform simple calculations or to pull data from this collection and I'm not sure what I need to use at this point. Here is some example pseudo code:

public class ExampleData
{
     public DateTime Date { get; set; }
     public decimal Value { get; set; }
}

public ConcurrentQueue<ExampleData> QueueCol { get; set; } = new();

public void AddToQueue(DateTime date, decimal value)
{
     QueueCol.Enqueue(new ExampleData() { Date = date, Value = value });
}

public void DisplayPastData()
{
     var count = QueueCol.Count();
     var prev1Data = count >= 2 ? QueueCol.ElementAt(count - 2) : null;
     var prev2Data = count >= 3 ? QueueCol.ElementAt(count - 3) : null;
     var prev3Data = count >= 4 ? QueueCol.ElementAt(count - 4) : null;

     if (prev1Data != null)
     {
         Console.WriteLine($"Date: {prev1Data.Date} Value: {prev1Data.Value}");
     }
     
     if (prev2Data != null)
     {
         Console.WriteLine($"Date: {prev2Data.Date} Value: {prev2Data.Value}");
     }

     if (prev3Data != null)
     {
         Console.WriteLine($"Date: {prev3Data.Date} Value: {prev3Data.Value}");
     }
}

This is a very rough example but even with displaying data most of it looks correct and then I will get dates completely out of left field like a date from the previous day in between dates from the current day and so because of ordering issues like that I know the data isn't correct so my question is how do I convert the concurrent queue to a new collection that will allow me to keep the order and to work with the data without giving incorrect results?


Solution

  • The usage pattern you describe in your question makes a ConcurrentQueue<T> not a suitable collection for your scenario. As far as I can understand the requirements are:

    1. The producer(s) should be able to enqueue items in the collection without being blocked for any amount of time.
    2. The consumer(s) should be able to perform calculations on a snapshot of the collection, without creating an expensive copy of the collection, and without interfering in any way with the producer(s).

    The collection that seems more suitable for your scenario out of the box, is the ImmutableList<T>. This collection can be updated with lock-free Interlocked operations, and it is essentially a snapshot by itself (because it is immutable). Here is how you could use it in a multithreading scenario, with thread-safety and without blocking any thread:

    private ImmutableList<ExampleData> _data = ImmutableList<ExampleData>.Empty;
    
    public ImmutableList<ExampleData> Data => Volatile.Read(ref _data);
    
    public void AddToQueue(DateTime date, decimal value)
    {
        var newData = new ExampleData() { Date = date, Value = value };
        ImmutableInterlocked.Update(ref _data, (x, y) => x.Add(y), newData);
    }
    
    public void DisplayPastData()
    {
        ImmutableList<ExampleData> snapshot = Volatile.Read(ref _data);
        int count = snapshot.Count;
        var prev1Data = count >= 2 ? snapshot[count - 2] : null;
        var prev2Data = count >= 3 ? snapshot[count - 3] : null;
        var prev3Data = count >= 4 ? snapshot[count - 4] : null;
    
        if (prev1Data != null)
        {
            Console.WriteLine($"Date: {prev1Data.Date} Value: {prev1Data.Value}");
        }
    
        if (prev2Data != null)
        {
            Console.WriteLine($"Date: {prev2Data.Date} Value: {prev2Data.Value}");
        }
    
        if (prev3Data != null)
        {
            Console.WriteLine($"Date: {prev3Data.Date} Value: {prev3Data.Value}");
        }
    }
    

    The immutable collections are not without disadvantages. They are a lot slower in comparison with the normal collections, they require significantly more memory, and they create significantly more garbage every time they are updated.

    An optimal solution to your specific scenario could be a combination of a ConcurrentQueue<ExampleData> (recent data) and a List<ExampleData> (historic data). The producer(s) would enqueue items in the ConcurrentQueue<T>, and the single consumer would dequeue all the items from the ConcurrentQueue<T> and then add them in the List<T>. Then it would use the List<T> to do the calculations.