Search code examples
c#apache-kafkaconfluent-kafka-dotnetkafka-partition

My Kafka Topic has three partitions. I cyclic read from all partitions the newest data coming from source, but one reading cycle takes too long. Why?


In my usecase tha data source (CNC machine) is sending data (flow value of a liquid) to a Kafka topic in one second interval. There a machanism in the source side that provides that the data is send to Kafka in the correct order. The Kafka topic has three partitions (that's a mandatory standard in our company). Kafka is deciding itself where to write the incoming data each time. (This I can also not change).

With my C# code I have to draw a live chart of the flow data. (I use Confluent-Kafka Library) Therefore I try to read always the last incoming data from the CNC machine to Kafka. As Kafka writes the data each time to another partition, I do following every second:

  1. I read from all three partitions P0,P1,P2 the last incoming (the newest) data message with Offset.End.

  2. I compare the timestamps of these "newest" messages of P0,P1 and P2

  3. I take the one with the greatest Timestamp . I pass this data to my live chart.

But in my code, I think that I do something wrong, because one data reading cycle from Kafka takes in avarage 7-8 seconds. (I have marked the part that takes 2-3 seconds for each partition).

Do I have a mistake in my code? Or is my srategie wrong? I can definitly say that there is no network problem or Kafka performance problem, as other consumers can consume data without problem from the same Kafka server.

My code:

public void Read_from_Kafka()
    {
        try
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = server_uri,
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Latest,
                SecurityProtocol = SecurityProtocol.Ssl,
                SslCaLocation = path_1,
                SslCertificateLocation = path_2,
                SslKeyLocation = path_3,
                SslKeyPassword = password_string,
                EnableAutoCommit = false
            };

            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken cancellationToken = source.Token;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                consumer.Subscribe("My_Topic");
                while (var_true)
                {
                    //Reading newest data from partition-0
                    TopicPartitionOffset tps_0 = new TopicPartitionOffset(new TopicPartition("My_Topic", 0),Offset.End);
                    // this part of code takes 2-3 seconds-start*****
                    consumer.Assign(tps_0);
                    var consumeResult_0 = consumer.Consume(cancellationToken);
                    // this part of code takes 2-3 seconds-stop*****
                    Kafka_message_P0 = consumeResult_0.Message.Value;                                          
                    
                    //Reading newest data from partition-1
                    TopicPartitionOffset tps_1 = new TopicPartitionOffset(new TopicPartition("My_Topic", 1), Offset.End);
                    // this part of code takes 2-3 seconds-start*****
                    consumer.Assign(tps_1);
                    var consumeResult_1 = consumer.Consume(cancellationToken);
                    // this part of code takes 2-3 seconds-stop*****
                    Kafka_message_P1 = consumeResult_1.Message.Value;
                                            
                    //Reading newest data from partition-2
                    TopicPartitionOffset tps_2 = new TopicPartitionOffset(new TopicPartition("My_Topic", 2), Offset.End);
                    // this part of code takes 2-3 seconds-start*****
                    consumer.Assign(tps_2);
                    var consumeResult_2 = consumer.Consume(cancellationToken);
                    // this part of code takes 2-3 seconds-stop*****
                    Kafka_message_P2 = consumeResult_2.Message.Value;
                                           
                    // Reading the time stamps of the last written message in each partition, and finding out the newest (most actual) data.
                    if(TimeStamp_dateTime_P0> TimeStamp_dateTime_P1 && TimeStamp_dateTime_P0 > TimeStamp_dateTime_P2)
                    {
                        newest_Kafka_value = Kafka_value_P0;
                    }
                    if (TimeStamp_dateTime_P1 > TimeStamp_dateTime_P0 && TimeStamp_dateTime_P1 > TimeStamp_dateTime_P2)
                    {
                        newest_Kafka_value = Kafka_value_P1;
                    }
                    if (TimeStamp_dateTime_P2 > TimeStamp_dateTime_P1 && TimeStamp_dateTime_P2 > TimeStamp_dateTime_P0)
                    {
                        newest_Kafka_value = Kafka_value_P2;
                    }

                    // send this data to live chart
                    System.Threading.Thread.Sleep(1000);
                }
                consumer.Close();
            }
        }
        catch(Exception ex)
        {
            using (StreamWriter sw = File.AppendText(error_log))
            {
                sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
            }
        }           
        
    }

Solution

  • The problem is the company mandate of 3 partitions.

    A replication factor of 3 is probably what they meant.

    Kafka doesn't sort data outside of one partition, and never sorts by timestamp. You're also not guaranteed to get 3 records for every iteration of your loop.


    The assign API must disconnect from a broker, then discover the leader broker of the new partition every time. It's a blocking api call.

    You could instead plot data from all partitions, then use the max functions of your plotting tool (such as Grafana or Prometheus) to see only the max value per time interval.