I am using Confluent kafka package in my recent c# project. I created a producer in the following way:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
But the issue is that some of my messages are not getting to the consumer. They are getting lost somewhere. However, if I use await with the producer then all the messages are getting delivered. How to deliver all my messages without await. (I have a single partition)
First of all, you should use only a single Producer
to send your msgList
, because creating a new Producer
for each message is really expensive.
What you could do, is use Produce()
method with Flush()
. With Produce()
you will send messages asynchronously without waiting for a response. And then calling a Flush()
will block until all in-flight messages are delivered.
var prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
using var producer = new ProducerBuilder<Null, string>(prodConfig).Build();
foreach (var msg in msglist)
{
producer.Produce(topic, new Message<Null, string> { Value = msg });
}
producer.Flush();
Without await
or Flush()
you might lose messages because your producer might be disposed before all the messages are delivered.