due to a project I'm generating some synthetic Data for building up a infrastructure for analysing purposes.
The infrastructure is build up with the data simulation for generating synthetic data. This data is send via UDP to node-red server and afterwards handed over via mqtt to kafka.
I simply generate a boolean combined with a timestamp. Now I want to analyse the time bewtween those timestamps. The data should be generated with some time in bewtween. So here's an example:
Data A:
{
isActivated: false,
Timestamp: "xxxxx"
}
Data B:
{
isActivated: true,
Timestamp: "xxxxx+deltaTime"
}
So B.Timestamp-A.Timestamp = deltaTime
. So far so good, but when I'm adding a Thread.Sleep(delay)
, the delay will be added to the deltaTime calculated from Kafka-Consumer... (I got the right Timestamps in the message for calculation, not those which produced from kafka itself. I tested it by adding 2 Days to the produced Timestamps in the DataGenerator)
So here the code-example:
public class CustomData
{
public DateTime Timestamp { get; set; }
public bool isActivated { get; set; }
}
public class DataGenerator
{
private bool DataAActivated { get; set; }
private IPAddress ipAddress = IPAddress.Parse("XXX.XXX.XXX.XXX");
private UdpClient udpClient = new UdpClient();
private IPEndPoint iPEndPoint = new IPEndPoint(ipAddress, XXXXX);
public DataGenerator (bool dataAActivated)
{
DataAActivated = dataAActivated;
}
public void GenerateData(double delay, int deltaTime)
{
DateTime ts0 = DateTime.Now;
DateTime ts2 = ts0.AddMilliseconds(deltaTime);
if (DataAActivated)
{
CustomData dataA = new CustomData();
dataA.isActivated = false;
dataA.Timestamp = ts0;
CustomData dataB = new CustomData();
dataB.isActivated = true;
dataB.Timestamp = ts2;
}
else
{
CustomData dataB = new CustomData();
dataB.isActivated = false;
dataB.Timestamp = ts0;
CustomData dataA = new CustomData();
dataA.isActivated = true;
dataA.Timestamp = ts2;
}
// this is causing issues
Thread.Sleep((int)delay);
SendData(dataA);
SendData(dataB);
}
private void SendData(CustomData data)
{
udpClient.Connect(iPEndPoint);
byte[] jsonUtf8Bytes;
var options = new JsonSerializerOptions
{
WriteIndented = true
};
jsonUtf8Bytes = System.Text.Json.
JsonSerializer.SerializeToUtf8Bytes(data, options);
udpClient.Send(jsonUtf8Bytes,jsonUtf8Bytes.Length);
udpClient.Close();
}
}
---------------- within Kafka-Consumer ----------------
public double CalcDuration( CustomData dataA, CustomData dataB)
{
double duration = dataB.Timestamp.Subtract(dataA.Timestamp).TotalMilliseconds;
Console.WriteLine($"duration: {duration}");
return duration;
}
the output would look like this if the deltaTime would be 100 ms, and the delay 500ms without line with Thread.Sleep(delay):
duration: 100ms
duration: 100ms
...
with Thread.Sleep(delay):
duration: 600ms
duration: 600ms
...
Does anybody has a hint for me how to fix this?
I hope i could make it clear, but don't hesitate to make me aware of editing this post.
Thanks a lot,
Greetings
I've found the issue. As Peter and Marian mentioned there is no failure caused by the code I've posted in the question.
This brought me up to get through the code of the other services. By cleaning up the whole kafka-cluster related topics and starting to produce again, I saw that in the second service of the Producer/Consumer chain I set up the initial state wrong.
So the first message was skipped and therefore the calculation between the timestamps was one before the delay an one after. Thats because of a constraint I set for calculation of duration to be from the same offset.
so to make that clear: data which is send to kafka arrives in topic "ingestion" Messages there will be like this:
ingestion:
(1)dataA
(2)dataB
(3)dataA
(4)dataB
...
From that I dispatch those into two topics "activated", "notActivated" because the timestamp from notActivated data is everytime greater than from activated. To ensure that no packet loss or something happend I proof it by holding the last state (related to isActive attribute from CustomData) in the dispatcher. And that sate was initialy set wrong, so what happened is:
notActivated:
(1)dataA //--> this entry was not there because it was skipped refering to false state matching
(4)dataB
activated:
(2)dataB
(3)dataA
so the calculation of duration was
duration = (4)dataB - (2)dataB
and because of that the duration was:
duration = deltaTime + delay
Thanks a lot again to Peter and Marian
maudeees