There is Kafka Topic called "WebMessages" on 2 partions.
We have two consumer group in same server but in different site on IIS.
One of the consumer cannot receive messages. The other one missed most of the messages.
When I write simple consumer on my local computer, I also missed some messages. Any idea what's going wrong ?
Here is the producer code :
_producerConfig = new ProducerConfig {
BootstrapServers = _addressWithPort,
Acks = Acks.All
};
using (var p = new ProducerBuilder<string, string>(_producerConfig).Build())
{
p.ProduceAsync(_topicName, new Message<string, string>
{
Key = ldtoKafkaMessage.Key,
Value = ldtoKafkaMessage.Message
}).ContinueWith(task =>
{
if (task.IsFaulted)
{
TraceController.TraceError(Common.Enums.TraceEventCategories.X, "Key|Message", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + task.Exception.Message + " " + task.Exception.InnerException+ " " + task.Exception.StackTrace);
}
else
{
TraceController.TraceInformation(Common.Enums.TraceEventCategories.X, "Key|Message|Result", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + "Success");
}
});
}
So I make sure that I sent messages to producer.
Here is the consumer code.
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _addressWithPort,
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = consumerGroupId
};
using (var c = new ConsumerBuilder<string, string>(_consumerConfig).Build())
{
c.Subscribe(_topicName);
CancellationTokenSource cts = new CancellationTokenSource();
try
{
while (!cts.IsCancellationRequested)
{
try
{
var cr = c.Consume(cts.Token);
LdtoKafkaMessage.Key = cr.Key;
LdtoKafkaMessage.Message = cr.Value;
this.OnMessageChanged();
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
Some of your messages might not be produced, because you dispose producer and don't wait for it to finish. You can ensure that messages are delivered by using await
keyword on ProduceAsync
method or calling Flush()
before disposing the producer.