Search code examples
c#tpl-dataflow

Each element of InputQueue in TransformBlock<TInput, TOutput> is overwritten whenever a record is read


Purpose

I'm trying to create a pipeline where I read from a file one bytes record at a time and send it to a `BufferBlock', which append items in the buffer block. This is linked through the trivial LinkTo () method to TransformBlock <byte [], MyObject> which converts each record of bytes into the MyObject object. Below is the whole method that does all this:

    BufferBlock<byte[]> buffer = new BufferBlock<byte[]>();
    TransformBlock<byte[], MyObject> transform = new TransformBlock<byte[], MyObject>(bytes =>
    {
        return FromBytesTOMyObject(bytes);
    });

    private void ReadFileAndAppend()
    {
        buffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });

        BinaryReader br = new BinaryReader(new FileStream("C:\\Users\\MyUser\\myFile.raw",FileMode.Open, FileAccess.Read, FileShare.Read));                                  
        int count;
        byte[] record = new byte[4000];

        // Post more messages to the block.
        Task.Run(async () =>
        {
            while ((count = br.Read(record, 0, record.Length)) != 0)
                await buffer.SendAsync(record);
            buffer.Complete();
        });
        transform.Completion.Wait();
        Console.WriteLine("");

Below the method that is called inside the TransformBlock:

static public MyObject FromBytesToMyObject(byte[] record)
    {
        MyObject object = new MyObject();
        object.counter = BitConverter.ToInt32(record, 0);
        object.nPoints = BitConverter.ToInt32(record, 4);

        for (int i = 0; i < object.nPoints; i++)
        {
            int index = i * 4;
            object.A[i] = BitConverter.ToSingle(record, index + 8);
        }
        return object;
    }

as can be seen from the FromBytesToMyObject () methodthere is a counter inside each record read. So each record never has a counter equal to another record (I also checked through a byte reader like HxD)

Problem

With this setting I thought that the reading and interpretation of the file took place smoothly. But going into debugging and inserting a breakpoint in the "while" after reading about 50 records or more, I noticed that in TransformBlock's OutputQueue groups of records with the same counter are queued, and therefore identical records.

Example:

Exact queue considering only the counters: 1,2,3,4,5,6,7,8,9,10.

Queue that I actually see in OutputQueue: 1,2,2,2,3,3,3,3,4,4,5,5 ....

Can you explain to me where I'm wrong?


Solution

  • You are reusing the same byte[] record over and over again, without any thread-safety considerations. No wonder that things are going south fast. If you want to ensure the correctness of the whole operation, you must use a different buffer each time:

    while (true)
    {
        var record = new byte[4000];
        var count = binaryReader.Read(record, 0, record.Length);
        if (count == 0) break;
        Array.Resize(ref record, count);
        await buffer.SendAsync(record);
    }
    buffer.Complete();
    

    If you also care about the performance and don't want to overburden the garbage collector, you should look at the ArrayPool class. But be careful because this class offers new ways to shoot yourself in the foot.