I am trying to write messages into Kafka and below is my producer where if i use produce it has DeliveryHandler and i could access DeliveryReport, but when i use ProduceAsync then the return type is deliveryResult how do i get DeliveryReport and log the reason for failure
Using Produce:
public void WriteMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
producer.Produce(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
},
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
}
});
producer.Flush(TimeSpan.FromSeconds(10));
}
}
In above code I could get access to DeliveryReport which is inheriting DeliveryResult and could get access to Error Reason & DeliveryResult --> TopicPartitionOffset both, and below is the metadata:
namespace Confluent.Kafka
{
//
// Summary:
// The result of a produce request.
public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
{
public DeliveryReport();
//
// Summary:
// An error (or NoError) associated with the message.
public Error Error { get; set; }
//
// Summary:
// The TopicPartitionOffsetError associated with the message.
public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
}
}
Using ProduceAsync
public async Task WriteAysncMessage(string message)
{
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
producer.Flush(TimeSpan.FromSeconds(60));
}
}
In above method while using ProducerAsync how do i get access to DeliveryReport to log the Error Reason just like Produce, when i do await on ProducerAsync it is returning DeliveryResult but not DeliveryReport
Also, which is good to use Produce or ProduceAsync while writing on to Kafka.
I think i got the solution:
using (var producer = new ProducerBuilder<string, string>(this._config).Build())
{
try
{
var deliveryResult = await producer.ProduceAsync(this._topicName, new Message<string, string>()
{
Key = rand.Next(5).ToString(),
Value = message
});
Console.WriteLine($"KAFKA => Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Failed to deliver message: {e.Error.Reason}");
}
producer.Flush(TimeSpan.FromSeconds(60));
}