EDIT & Update - I have tried the same code now on my personal computer and it works very much fine. I was able to copy any type of file using this same code without any problem. I have this issue when I run the code on my work computer. I just don't understand how and why this depends on the computer. please let me know whether I am missing anything here.
In the readTask I am sequentially reading a file and adding the bytes to BlockingCollection. and in the consume task I am reading the data as it appears in BlockingCollection and writing it to a file. Since, by default, BlockingCollection is a wrapper on ConcurrentQueue I expect the read from blocking queue to be in same order as it was written to it.
But when I compare the destination file with source it is totally different and sometimes I see duplicates.
My source file is just a sequence of numbers with each number on new line, like below.
1
2
3
4
5
6
7
8
9
10
in my file I have around 5000 numbers for the file to have enough size. Is there something wrong with this code? or is it not the way blocking collection is supposed to work. In this example I am writing to a file but in reality I need to push this file to a Rest API and it is important for data to be sent in sequence. if the bytes cant be sent in sequence file will be corrupt when it is stored on server.
static void Main(string[] args)
{
BlockingCollection<byte[]> bc = new BlockingCollection<byte[]>(10);
Task consumeTask = Task.Factory.StartNew(() =>
{
var fs = File.OpenWrite(@"C:\Temp\pass_new.txt");
foreach (byte[] data in bc.GetConsumingEnumerable())
{
fs.Write(data, 0, data.Length);
}
fs.Close();
});
Task readTask = Task.Factory.StartNew(() =>
{
var fs = File.OpenRead(@"C:\Temp\pass.txt");
var bufferSize = 4096;
var buffer = new byte[bufferSize];
int bytesRead = 0;
while ((bytesRead = fs.Read(buffer, 0, buffer.Length)) != 0)
{
byte[] dataToWrite = buffer;
if (bytesRead < bufferSize)
{
dataToWrite = new byte[bytesRead];
Array.Copy(buffer, dataToWrite, bytesRead);
}
bc.Add(dataToWrite);
}
fs.Close();
}).ContinueWith(ant => bc.CompleteAdding());
consumeTask.Wait();
}
I believe it is because you are re-cycling the buffer when bytesRead == bufferSize
. Imagine these two sequences (I use the term "pointer" loosely here to refer to a reference variable, I think it gets the point across better).
First, when you are below buffer size:
buffer
point at a new byte array in memory 4096 in size.fs.Read
writes 20 bytes to the object buffer
points at.dataToWrite
point at the same object as buffer.dataToWrite
point at a new byte array 20 bytes in size.buffer
points at to the object dataToWrite
points at.dataToWrite
points at in the blocking collection.fs.Read
writes 30 bytes to the object buffer
points at.Now compare that to what happens if you meet the buffer size.
buffer
point at a new byte array in memory 4096 in size.fs.Read
writes 4096 bytes to the object buffer
points at.dataToWrite
point at the same object as buffer.dataToWrite
points at in the blocking collection.fs.Read
writes 30 bytes to the object buffer
points at.Because dataToWrite
, buffer
, and the item you added to the blocking collection all point at the same object the last fs.Read
will modify the byte array that was just stored in the collection.
Drop your if statement and always allocate a new dataToWrite
and your program should work fine.