Search code examples
.netkafka-producer-apiconfluent-kafka-dotnet

Kafka producer with Async not returning DeliveryReport but DeliveryResult


I am trying to write messages into Kafka and below is my producer where if i use produce it has DeliveryHandler and i could access DeliveryReport, but when i use ProduceAsync then the return type is deliveryResult how do i get DeliveryReport and log the reason for failure

Using Produce:

    public void WriteMessage(string message)
    {
        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {
            producer.Produce(this._topicName, new Message<string, string>()
            {
                Key = rand.Next(5).ToString(),
                Value = message
            },
            (deliveryReport) =>
            {
                if (deliveryReport.Error.Code != ErrorCode.NoError)
                {
                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                }
                else
                {
                    Console.WriteLine($"KAFKA => Delivered '{deliveryReport.Value}' to '{deliveryReport.TopicPartitionOffset}'");
                }
            });

            producer.Flush(TimeSpan.FromSeconds(10));
        }
    }

In above code I could get access to DeliveryReport which is inheriting DeliveryResult and could get access to Error Reason & DeliveryResult --> TopicPartitionOffset both, and below is the metadata:

namespace Confluent.Kafka
{
    //
    // Summary:
    //     The result of a produce request.
    public class DeliveryReport<TKey, TValue> : DeliveryResult<TKey, TValue>
    {
        public DeliveryReport();

        //
        // Summary:
        //     An error (or NoError) associated with the message.
        public Error Error { get; set; }
        //
        // Summary:
        //     The TopicPartitionOffsetError associated with the message.
        public TopicPartitionOffsetError TopicPartitionOffsetError { get; set; }
    }
}

Using ProduceAsync

    public async Task WriteAysncMessage(string message)
    {
        using (var producer = new ProducerBuilder<string, string>(this._config).Build())
        {

            var deliveryReport = await producer.ProduceAsync(this._topicName, new Message<string, string>()
            {
                Key = rand.Next(5).ToString(),
                Value = message
            });

            
            producer.Flush(TimeSpan.FromSeconds(60));
        }
    }

In above method while using ProducerAsync how do i get access to DeliveryReport to log the Error Reason just like Produce, when i do await on ProducerAsync it is returning DeliveryResult but not DeliveryReport

Also, which is good to use Produce or ProduceAsync while writing on to Kafka.


Solution

  • I think i got the solution:

            using (var producer = new ProducerBuilder<string, string>(this._config).Build())
            {
                try
                {
                    var deliveryResult = await producer.ProduceAsync(this._topicName, new Message<string, string>()
                    {
                        Key = rand.Next(5).ToString(),
                        Value = message
                    });
    
                    Console.WriteLine($"KAFKA => Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
                }
                catch (ProduceException<string, string> e)
                {
                    Console.WriteLine($"Failed to deliver message: {e.Error.Reason}");
                }
                producer.Flush(TimeSpan.FromSeconds(60));
            }