Search code examples
c#multithreadingproducer-consumerconcurrent-queue

How to properly use a ConcurrentQueue for writing in a file on a separate thread?


I have an application that receives data from outside, and I'm trying to make it so that writing this data to a file is done in a separate thread (due to the fact that writing it in the thread that received it directly caused problems with writing the data fast enough). It seems to me that ConcurrentQueue is suitable for this.

But I have a problem because writing to the file does not happen at all with the code below:

Declarations:

// Declare the queue as class members
private static ConcurrentQueue<DataBuffer> queue = new ConcurrentQueue<DataBuffer>();
private static Task consumerTask;

Consumer:

static void Consumer()
{
    // Open the binary file for writing
    using (var fileStream = File.OpenWrite("received_data.bin"))
    {
        // Try to dequeue an item from the queue
        if (queue.TryDequeue(out var dataBuffer))
        {
            // Write the data to the binary file
            fileStream.Write(dataBuffer.Buffer, 0, dataBuffer.Length);
        }
    }
}

Consumer Task started when my external interface is opened, so data is expected:

consumerTask = Task.Factory.StartNew(Consumer);

This is how I enqueue data in the event handler executed when data is received:

// Enqueue the received data in the queue
queue.Enqueue(new DataBuffer(e.Bytes, (int)e.NumBytesAvailable));

DataBuffer class, because I wanted to pass to queue data and its size:

public class DataBuffer
{
    public byte[] Buffer { get; set; }
    public int Length { get; set; }

    public DataBuffer(byte[] buffer, int length)
    {
        Buffer = buffer;
        Length = length;
    }
}

So I tried to add AutoResetEvent and in this case the write happens only once, for one received data packet:

private static AutoResetEvent queueEvent = new AutoResetEvent(false);
// Enqueue the received data in the queue
queue.Enqueue(new DataBuffer(e.Bytes, (int)e.NumBytesAvailable));
// Set the auto reset event to signal that there is data available in the queue
queueEvent.Set();

Consumer with waiting for data:

static void Consumer()
{
    // Open the binary file for writing
    using (var fileStream = File.OpenWrite("received_data.bin"))
    {
        // Wait until there is data available in the queue
        queueEvent.WaitOne();
        
        // Try to dequeue an item from the queue
        if (queue.TryDequeue(out var dataBuffer))
        {
            // Write the data to the binary file
            fileStream.Write(dataBuffer.Buffer, 0, dataBuffer.Length);
        }
    }
}

Could you please help?


EDIT

I am trying to use BlockingCollection, but so at first glance for some reason the data received in Consumer is messed up relative to what is added to the collection. As if a different part of the data buffer there is being read. It is something like, the following goes into the collection: 0x112233445566778899... and I receive 0x5566778899AABBCCDD...

EDIT2

This problem may be related to the fact that the received data does not come in full packets, so the data buffer passed to the collection does not always start with the example 0x11223344556677.... and something seems to me that in Consumer I do not receive from the queue this data one by one, or some part passed to the queue got lost somewhere and I then see a packet of data that was received from an external device somewhere in the middle and that is why I see these for example 0x55667788....

private static BlockingCollection<DataBuffer> collection = new BlockingCollection<DataBuffer>();

Addidng data to collection:

// Add the received data to the blocking collection
collection.Add(new DataBuffer(e.Bytes, (int)e.NumBytesAvailable));

In Consumer:

while(true)
{
    if (collection.TryTake(out var dataBuffer, Timeout.Infinite))
    {
        // Write the data to the binary file
        fileStream.Write(dataBuffer.Buffer, 0, dataBuffer.Length);
    }
}

EDIT3

I changed it to suggested modern class Channel, but still get incorrect results. Here is the output of the first 8 bytes of the producer/consumer buffers in order as processed:

Producer: 12-AB-34-CD-56-EF-78-56
Producer: 12-AB-34-CD-56-EF-78-56
Producer: 56-EF-78-56-9A-43-BC-21
Consumer: 56-EF-78-56-9A-43-BC-21
Consumer: 56-EF-78-56-9A-43-BC-21
Consumer: 56-EF-78-56-9A-43-BC-21
Producer: 12-AB-34-CD-56-EF-78-56
Producer: 56-9A-43-BC-21-12-AB-34
Consumer: 56-9A-43-BC-21-12-AB-34
Producer: 21-12-AB-34-CD-56-EF-78
Consumer: 21-12-AB-34-CD-56-EF-78
Producer: CD-56-EF-78-56-9A-43-BC
Consumer: CD-56-EF-78-56-9A-43-BC
Producer: 21-12-AB-34-CD-56-EF-78

Code snippets for Channel:

private static Channel<DataBuffer> channel = Channel.CreateUnbounded<DataBuffer>();
channel.Writer.TryWrite(new DataBuffer(_ReceivedDataBuffer2, _ReceivedRawDataBufferPosition2));
static async Task Consumer()
{
    using (var fileStream = File.OpenWrite("received_data.bin"))
    {
        while (await channel.Reader.WaitToReadAsync())
        {
            // Try to read a DataBuffer instance from the channel
            if (channel.Reader.TryRead(out var dataBuffer))
            {
               System.Diagnostics.Debug.WriteLine("Consumer: " + BitConverter.ToString(dataBuffer.Buffer.Take(8).ToArray()));
               // Write the data to the binary file
               fileStream.Write(dataBuffer.Buffer, 0, dataBuffer.Length);
            }
        }
    }
}

EDIT 4

According to suggestions from the comments, I went back to using BlockingCollection. Below is the minimal, reasonably cleaned-up code with which I handle this, to demonstrate that I use only one producer thread and one consumer. For testing purposes, I have added writing the data to a file also before passing it to the collection. This way I can compare the data before the queue and after the queue.

First, two more words about the received data. Normally, a data buffer with e.Bytes contains about 65 kB of e.NumBytesAvailable data. In these bytes I have packets of about 1.5 kB consisting of: data + sequence number + read_status.

In the saved files I can see the differences. In the file saved by Consumer, I can see that, for example, I have the correct sequence of packets ..., 41, 42, 43, after which packet 44 is no longer whole and there is a jump to, say, packet 56, then there are further packets 57, 58, 59, 60, after which there is a return to the previously missed packets, but not all of them, for example, from 48, 49, 50,..., 80, so in this sequence there are again packets 57, 58, 59, 60 saved, and packets 45 to 47 are not present in the file at all.

This type of rearrangement appears, of course, non-stop in this file after taking data out of the queue. The data in the file before the queue is in the correct order and continuity.

using System;
using System.IO;
using System.Management;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace test_app_1
{
    public class USB
    {
        // -----------------------------------------------------------------
        // Type of commands handled by FPGA constants:
        const byte CMD_DDC_REGISTER = 0x30;

        // -----------------------------------------------------------------
        // Type of application operations constants:
        const int MODE_NOP = 0;
        const int MODE_DDC_RAW_SAVE_TO_FILE = 16;

        // default operation mode:
        private int _OperationMode = MODE_NOP;

        // -----------------------------------------------------------------
        // Events declarations:
        public event EventHandler<CustomEventArgs> OnFtdiConnected;
        public event EventHandler OnFtdiDisconnected;
        public event EventHandler<CustomEventArgs> OnFtdiOpenedAndConfigured;

        public FtdiFifo FtdiFifo;
        private readonly string _AllowedSerialNumber = "FT786P7IA";
        public bool FtdiOpened = false;

        private byte[] _CommandBytes = new byte[10];

        private byte[] _ReceivedRawDataBuffer = new byte[150000];
        private uint _ReceivedNumberOfBytes = 0;

        static private FileStream _FileStream;
        static private FileStream _FileStreamBeforeQueue;

        private static Task consumerTask;
        private static BlockingCollection<DataBuffer> collection = new BlockingCollection<DataBuffer>();
        private DataBuffer producerDataBuffer;


        // -----------------------------------------------------------------
        // USB class constructor:
        // * configures automatic detection of events related to connected USB devices
        // -----------------------------------------------------------------
        public USB()
        {
            // ------------ autodetect USB hardware ------------------------
            // Set up the query to listen for device arrival and removal events
            WqlEventQuery query = new WqlEventQuery("SELECT * FROM __InstanceOperationEvent WITHIN 1 WHERE TargetInstance ISA 'Win32_USBHub'");

            // Set up the event watcher
            ManagementEventWatcher watcher = new ManagementEventWatcher(query);

            // Add event handlers to be notified when a device is connected or removed
            watcher.EventArrived += new EventArrivedEventHandler(Handler_DeviceChangedEvent);

            // Start listening for events
            watcher.Start();                 
        }


        // -----------------------------------------------------------------
        // EVENT HANDLER: for USB FTDI device connection events
        // -----------------------------------------------------------------
        private void Handler_DeviceChangedEvent(object sender, EventArrivedEventArgs e)
        {
            using (var moBase = (ManagementBaseObject)e.NewEvent.Properties["TargetInstance"].Value)
            {
                string devicePNPId = moBase?.Properties["PNPDeviceID"]?.Value.ToString();

                switch (e.NewEvent.ClassPath.ClassName)
                {
                    case "__InstanceDeletionEvent":                        
                        //BeginInvoke(new Action(() => UpdateUI(eventMessage)));
                        if (devicePNPId == @"USB\VID_0403&PID_6010\" + _AllowedSerialNumber.Substring(0, _AllowedSerialNumber.Length - 1)) //remove last character (A - channel indicator) from the serial number
                        {
                            FtdiOpened = false;
                            // Raise the event
                            OnFtdiDisconnected?.Invoke(this, EventArgs.Empty);
                        }
                        break;
                    case "__InstanceCreationEvent":                        
                        //BeginInvoke(new Action(() => UpdateUI(eventMessage)));
                        if(devicePNPId == @"USB\VID_0403&PID_6010\" + _AllowedSerialNumber.Substring(0, _AllowedSerialNumber.Length - 1)) //remove last character (A - channel indicator) from the serial number
                        {
                            // Raise the event
                            OnFtdiConnected?.Invoke(this, new CustomEventArgs(_AllowedSerialNumber.Substring(0, _AllowedSerialNumber.Length - 1)));
                        }
                        break;
                    case "__InstanceModificationEvent":
                    default:
                        break;
                }
            }
        }

        // -----------------------------------------------------------------
        // EVENT HANDLER: Handles event triggered when FTDI is configured
        // for synchronous 245 mode; generates event for detector class
        // -----------------------------------------------------------------
        private void Handler_OnFtdiOpenedAndSetToFifoMode(object sender, CustomEventArgs e)
        {
            OnFtdiOpenedAndConfigured?.Invoke(this, new CustomEventArgs(e.Value));
        }

        // -----------------------------------------------------------------
        // Open and configure FTDI device
        // -----------------------------------------------------------------
        public void OpenAndConfigureFTDI()
        {
            // FTDI object
            FtdiFifo = new FtdiFifo();

            // FTDI device detection
            FtdiFifo.OnFtdiOpenedAndSetToFifoMode += Handler_OnFtdiOpenedAndSetToFifoMode;
            FtdiFifo.IdentifyDevice(); // FTDI device detection
            if (FtdiFifo.IsDeviceAllowed(_AllowedSerialNumber)) // Is the connected device allowed? (_AllowedSerialNumber)
            {
                // FTDI chip configuration to FIFO mode
                FtdiFifo.OpenInFifoMode(_AllowedSerialNumber);

                FtdiOpened = FtdiFifo.Opened;

                // Subscription to Events:
                FtdiFifo.OnFtdiBytesReceived += Handler_OnFtdiBytesReceived;

                // Start the consumer task
                consumerTask = Task.Run(Consumer);
            }                
        }


        // -----------------------------------------------------------------
        // EVENT HANDLER: for FTDI USB data reception
        // -----------------------------------------------------------------
        void Handler_OnFtdiBytesReceived(object sender, FtdiBytesReceivedEventArgs e)
        {
            switch (_OperationMode)
            {
                case MODE_DDC_RAW_SAVE_TO_FILE:
                         
                    _ReceivedNumberOfBytes = e.NumBytesAvailable;
                    Array.Copy(e.Bytes, 0, _ReceivedRawDataBuffer, 0, _ReceivedNumberOfBytes);
                    

                    // Add the received data to the blocking collection
                    producerDataBuffer = new DataBuffer(_ReceivedRawDataBuffer, (int)_ReceivedNumberOfBytes);
                    collection.Add(producerDataBuffer);

                    // TEST PURPOSE ONLY - writing to separate file the same data which is enqueued in collection
                    // Data "before queue" and "after queue" can be compared later
                    _FileStreamBeforeQueue.Write(_ReceivedRawDataBuffer, 0, (int)_ReceivedNumberOfBytes);

                    break;

                default:
                    break;
            }
        }
        // -----------------------------------------------------------------
        // A method that fetches data from a queue and writes it to a file.
        // It works in a dedicated Task.
        // -----------------------------------------------------------------
        static void Consumer()
        {
            while (true)
                {
                    foreach (DataBuffer dataBuffer in collection.GetConsumingEnumerable())
                    {
                        _FileStream.Write(dataBuffer.Buffer, 0, dataBuffer.Length);
                    }

                }
        }

        // -----------------------------------------------------------------
        // A method that sends a measurement start command to the device.
        // Used for button click handling.
        // -----------------------------------------------------------------
        public void DdcEnableConv()
        {
            _OperationMode = MODE_DDC_RAW_SAVE_TO_FILE;

            String FileName = "ddc_readout_" + DateTime.Now.ToString("yyyy_MM_dd_HH_mm_ss") + ".bin";
            String FileName2 = "ddc_readout_before_queue_" + DateTime.Now.ToString("yyyy_MM_dd_HH_mm_ss") + ".bin";
            _FileStream = new FileStream(FileName, FileMode.Create, FileAccess.Write);
            _FileStreamBeforeQueue = new FileStream(FileName2, FileMode.Create, FileAccess.Write);


            Array.Clear(_CommandBytes, 0, _CommandBytes.Length);      // zeroing command array
            _CommandBytes[0] = CMD_DDC_REGISTER;                      // setting command byte (first byte)
            _CommandBytes[1] = 0x07;                                  // setting command byte (first byte)

            _CommandBytes[9] = 0x01;

            FtdiFifo.SendDataIntoFifo2(_CommandBytes);                // sending data through FTDI
        }

        // -----------------------------------------------------------------
        // A method that sends a measurement stop command to the device.
        // Used for button click handling.
        // -----------------------------------------------------------------
        public void DdcDisableConv()
        {
            _OperationMode = MODE_DDC_RAW_SAVE_TO_FILE;

            Array.Clear(_CommandBytes, 0, _CommandBytes.Length);      // zeroing command array
            _CommandBytes[0] = CMD_DDC_REGISTER;                      // setting command byte (first byte)
            _CommandBytes[1] = 0x07;                                  // setting command byte (first byte)

            _CommandBytes[9] = 0x00;

            FtdiFifo.SendDataIntoFifo2(_CommandBytes);                // sending data through FTDI
        }

    }
}

// -----------------------------------------------------------------
// A class that defines an object used to transfer measurement data
// received from the FTDI (producer) to a Task that writes the data
// to a file (consumer).
//
// Allows two arguments to be sent: the data buffer and the length
// of valid data in the buffer.
// -----------------------------------------------------------------
public class DataBuffer
{
    public byte[] Buffer { get; set; }
    public int Length { get; set; }

    public DataBuffer(byte[] buffer, int length)
    {
        Buffer = buffer;
        Length = length;
    }
}

Solution

  • The ConcurrentQueue<T> does not include waiting/blocking capabilities, so it's not a sufficient tool by itself for implementing a producer-consumer scenario. The tool that you probably need is the BlockingCollection<T>. The standard way of consuming a BlockingCollection<T> is the GetConsumingEnumerable method:

    foreach (DataBuffer dataBuffer in collection.GetConsumingEnumerable())
    {
        // Write the dataBuffer to the file
    }
    

    This method return an enumerator that blocks the current thread while the collection is empty. This is great if the consumer runs on a dedicated thread, but sometimes there is no particular reason for doing all the consuming work on the same thread. In that case a Channel<T> is preferable, because it can be consumed with a non-blocking enumerator, freeing a thread while the channel is empty. The standard way of consuming a Channel<T> is the ReadAllAsync method:

    await foreach (DataBuffer dataBuffer in channel.Reader.ReadAllAsync())
    {
        // Write the dataBuffer to the file
    }
    

    A third option is the ActionBlock<T> from the TPL Dataflow library, which is essentially a combination of an asynchronous buffer (like the Channel<T>) and a processing loop.

    ActionBlock<DataBuffer> actionBlock = new(dataBuffer =>
    {
        // Write the dataBuffer to the file
    });
    

    The TPL Dataflow is intended mainly for more complex scenarios, that involve multiple interconnected blocks that communicate with each other, but there is nothing wrong with using a single ActionBlock<T> like in the above example. The only gotcha that I am aware of is that the action of the ActionBlock<T> swallows exceptions of type OperationCanceledException, instead of propagating them through the Completion property of the block.