Solved!!! Thank you
Beware of what kind of object you put into Queue.If you put a value, such as int, then enqueue will make a copy and everyone is happy. If you put a reference, such as byte[], string, enqueue put this reference into queue, then here comes problem. If this reference is changed before consumer read it, the consumer will read changed version of data.
To avoid this issue, get a new version of frame reference right after enqueue, in rxThread. Code:
public void rxThreadFunc()
{
byte[] data = new byte[datalen];//declare for the first iteration.
int j = 0;
while (true)
{
for (int i = 0; i < rxlen; i++)
{
data[j] = (byte)i;
j++;
if (j >= datalen)
{
j = 0;
mQ.Add(data);
using (StreamWriter fwriter = new StreamWriter("C:\\testsave\\rxdata", true))
{
for (int k = 0; k < datalen; k++)
{
fwriter.Write(data[k]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
data = new byte[datalen];//create new reference after enqueue/Add
}
}
}
}//rxThreadFunc()
Update1 I just wrote another simpler code so everyone can test it without serialport hardware. click button to run program. I think thread priority causes this problem, without changing rxthread's thread priority the dataProc thread will get right data. but I still don't know why.
rxThread.Priority=ThreadPriority.Hightest
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Collections.Concurrent;
using System.Threading;
using System.IO;
namespace WindowsFormsApplication1
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
Thread rxThread = new Thread(rxThreadFunc);
rxThread.Priority = ThreadPriority.Highest;//this causes problem
rxThread.Start();
Thread procThread = new Thread(dataProc);
procThread.Start();
}
BlockingCollection<byte[]> mQ = new BlockingCollection<byte[]>();
int datalen = 30;
int rxlen = 200;
public void rxThreadFunc()
{
int j = 0;
while (true)
{
byte[] data = new byte[datalen];//is this in the right place?
for (int i = 0; i < rxlen; i++)
{
data[j] = (byte)i;
j++;
if (j >= datalen)
{
j = 0;
mQ.Add(data);
using (StreamWriter fwriter = new StreamWriter("C:\\testsave\\rxdata", true))
{
for (int k = 0; k < datalen; k++)
{
fwriter.Write(data[k]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
}
}
}//rxThreadFunc()
public void dataProc()
{
byte[] outData = new byte[datalen];
while (true)
{
if (mQ.Count > 1)
{
outData=mQ.Take();
using(StreamWriter fwriter=new StreamWriter("C:\\testsave\\dataProc",true))
{
for (int i = 0; i < datalen; i++)
{
fwriter.Write(outData[i]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
}
}
}
}
Description: I write this application which contains two thread. RxThread receives data from serial port, sort it by header mark 0x55 0xaa, put the following 30 bytes into a FrameStruct class, then put this FrameStruct into Queue. dataProcess thread get frame from Queue, then store it to disk.
SerialPort=(rxbuff)=>RxThread=(rxFrame,Queue)=>dataProcess==>disk
Problem: Data received and saved into disk by dataProcess thread is somehow corrupted.
Tried: This is what I have tried for your reference.
Also I think time received by dataProcess is also correct.
5. If I put a Thread.sleep(20) after Monitor.Pulse() in RxThread, problem solved, but I don't understand why??? And what if I changed to another computer?
This is the code snapshot.
//declared:
//Queue<FrameStruct>messageQ=new Queue<FrameStruct>;
//object _LockerMQ=new object();
private void RxThread()
{
int bytestoread, i;
bool f55 = false;//55 flag
bool fs = false;//frame start flag
int j=0;//data index in FrameStruct
int m_lMaxFram=32;
bytestoread = 0;
FrameStruct rxFrame = new FrameStruct((int)m_lMaxFrame);
while (true)
{
if (Serial_Port.IsOpen == true)
{
if ((bytestoread = Serial_Port.BytesToRead) > m_lMaxFrame*2)//get at least two frames
{
rxbuff = new byte[bytestoread];
Serial_Port.Read(rxbuff, 0, bytestoread);
for (i = 0; i < bytestoread; i++)
{
if (rxbuff[i] == 0x55)
{
f55 = true;
continue;
}
if (rxbuff[i] == 0xaa && f55)
{//frame header 0x55, 0xaa
//new frame start
fs = true;
f55 = false;
j = 0;//rxframe index;
rxFrame.time = DateTime.Now;//store the datetime when this thread gets this frame
continue;
}
if (fs && j < m_lMaxFrame - 2)
{//frame started but not ended
rxFrame.data[j] = rxbuff[i];
j++;
}
if (j >= (m_lMaxFrame - 2) && fs)
{//frame ended if j=30, reaches the end of rxFrame.data
fs = false;
lock(_LockerMQ)
{
messageQ.Enqueue(rxFrame);
Monitor.Pulse(_LockerMQ);
}
//Thread.Sleep(20);//if uncomment this sleep, problem solved
using (StreamWriter fWriter = new StreamWriter("c:\\testsave\\RXdata", true))//save rxThread result into a file rawdata
{
fWriter.Write(rxFrame.time.ToString("yyyy/MM/dd HH:mm:ss.fff"));
fWriter.Write(",");
for (int k = 0; k < m_lMaxFrame - 2; k++)
{
fWriter.Write(rxFrame.data[k]);
fWriter.Write(",");
}
fWriter.Write("\n");
}
}
}
}//if ((bytestoread=Serial_Port.BytesToRead) > 0)
rxbuff = null;
Thread.Sleep(20);
}//(Serial_Port.IsOpen==true)
Thread.Sleep(100);
}//while(true),RxThread sleep
}//private void RxThread()
DataProcess thread:
public void dataProcess()
{
while (true)
{
lock (_LockerMQ)
{
while (messageQ.Count < 1) Monitor.Wait(_LockerMQ);//get at least one frame data
f_NewFrame = messageQ.Count;
if (f_NewFrame > 0)
{
procFrame = messageQ.Dequeue();
using (StreamWriter fWriter = new StreamWriter("c:\\testsave\\dPdata", true))
{
fWriter.Write(procFrame.time.ToString("yyyy/MM/dd HH:mm:ss.fff"));
fWriter.Write(",");
for (int i = 0; i < m_lMaxFrame - 2; i++)
{
fWriter.Write(procFrame.data[i]);
fWriter.Write(",");
}
fWriter.Write("\n");
}
}//if(f_NewFrame>0)
}//lock(messageQ)
}
}
FrameStruct contains members of time, and data[30]
class FrameStruct
{
public FrameStruct(int m_lMaxFrame)
{
time = DateTime.Now;
data = new byte[m_lMaxFrame - 2];
}
public DateTime time;
public volatile byte[] data;//volatile doesn't help
}
rxData saved by RxThread is correct, shows:
2015/07/18 18:40:26.125,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,111,51,204,
2015/07/18 18:40:26.177,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,112,51,204,
2015/07/18 18:40:26.177,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,113,51,204,
2015/07/18 18:40:26.297,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,114,51,204,
2015/07/18 18:40:26.298,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,115,51,204,
2015/07/18 18:40:26.298,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,116,51,204,
2015/07/18 18:40:26.299,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,117,51,204,
2015/07/18 18:40:26.420,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,118,51,204,
//^this columns is accumulated number
dPdata saved by dataProcessThread is WRONG, shows:
2015/07/18 18:40:31.904,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,227,51,204,
2015/07/18 18:40:31.905,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,228,51,204,
2015/07/18 18:40:31.905,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,229,51,204,
2015/07/18 18:40:32.026,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,231,51,204,
2015/07/18 18:40:32.026,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,231,51,204,
2015/07/18 18:40:32.147,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,232,51,204,
2015/07/18 18:40:32.148,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,233,51,204,
2015/07/18 18:40:32.148,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,234,51,204,
2015/07/18 18:40:32.269,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,236,51,204,
2015/07/18 18:40:32.269,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,236,51,204,
2015/07/18 18:40:32.510,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,237,51,204,
2015/07/18 18:40:32.512,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.512,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.514,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.514,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,241,51,204,
2015/07/18 18:40:32.635,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,243,51,204,
2015/07/18 18:40:32.635,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,243,51,204,
//^this accumulated number is not correct
Please help!
Thank you!
Update 2
Here is basic producer consumer, just add your logic:
class Program
{
static BlockingCollection<int> mQ = new BlockingCollection<int>();
static void Main(string[] args)
{
Thread rxThread = new Thread(rxThreadFunc);
rxThread.Priority = ThreadPriority.Highest;//this causes problem
rxThread.Start();
Thread procThread = new Thread(dataProc);
procThread.Start();
Console.ReadLine();
}
static public void rxThreadFunc()
{
for (int i = 0; i < 10; i++)
{
mQ.Add(i);
}
}
static public void dataProc()
{
foreach (int outData in mQ.GetConsumingEnumerable())
{
Console.WriteLine(outData);
}
}
}
Answer for Update 1:
Now, thanks to the BlockingCollection that you are using, the consumer (dataProc) may be very simple (just remove the loop and count checking, it would do all this syncronization for you):
foreach (byte[] outData in _taskQ.GetConsumingEnumerable())
{
using(StreamWriter fwriter=new StreamWriter("C:\\testsave\\dataProc",true))
{
for (int i = 0; i < datalen; i++)
{
fwriter.Write(outData[i]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
Now, the problem may be different than in the original post. Maybe because here the producer writes the data to the file as well?
Original:
Its alot of explanations, but the solution is simple just add the initialization of FrameStruct to your second "if" statement:
rxFrame = new FrameStruct((int)m_lMaxFrame);
As shown in your saved data file: in the corrupted file, each time when you have missing value (for example 230) - you have other value duplicated (for example 231). The count of the missing values equals to the count of the duplicated values.
The reason for that is that you add reference to the same object instance to your queue. Let look on the following scenario: RxThread loops N times, before it context switched to the dataProcess thread, it adds N refferences to the same instance of FrameStruct to the queue. The data in this instance would be the data from the last read loop iteration before context switch. Now the context switch happens: the dataProcess loops M < N times before it context switched back to the RxThread, so it reads M elements from the queue, but all of them pointing to the same instance, so it writes M times same line to the file (the last one as explained before)
Now, why does the Thread.Sleep helps. The short answer: it makes a very hight probability to context switch to the dataProcess thread, each time that 1 element added to the queue by RxThread. So its actually: read one --> context switch --> write one... and same thing again.
The long answer would be: After dataProcess thread doing Monitor.Wait it goes to the waiting queue and context switch schedules the RxThread. Now the thread adds the first element to the queue and do Monitor.Pulse. This moves the dataProcess thread to the ready queue. But not necessarily schedules it for run immediately, so RxThread can make another iteration. But if you do the Thread.Sleep - it is a very hight probability that there will be context switch and ataProcess thread be scheduled now.