Search code examples
asp.net-coreapache-kafkakafka-producer-apiapache-kafka-connect

Kafka - Error in Instantiating Producer Class in ASP.Net Core


I am using Confluent.Kafka

string kafkaEndpoint = "127.0.0.1:9092";

        string kafkaTopic = "testtopic";

        var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };

        using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
        {
            // Send 10 messages to the topic
            for (int i = 0; i < 10; i++)
            {
                var message = $"Event {i}";
                var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
                Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
            }
        }

I am getting the following compile time error:

Producer.ProduceAsync(TopicPartition, Message)' is inaccessible due to its protection level

Using ProducerBuilder like this:

var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();

shows error:

Cannot convert from 'Confluent.Kafka.Message<Confluent.Kafka.Null, MyClass>' to 'Confluent.Kafka.Message<Confluent.Kafka.Null, string>

Solution

  • In Confluent.Kafka nuget v1.0.1, Producer class is an internal class, that is it's not accessible. Looks like you need to use ProducerBuilder instead e.g. as:

    var producerConfig = new Dictionary<string, string> { { "bootstrap.servers", kafkaEndpoint } };
    
    using (var producer = new ProducerBuilder<Null, string>(producerConfig)
        .SetKeySerializer(Serializers.Null)
        .SetValueSerializer(Serializers.Utf8)
        .Build())
    {
        // Send 10 messages to the topic
        for (int i = 0; i < 10; i++)
        {
            var message = $"Event {i}";
            var result = producer.ProduceAsync(kafkaTopic, new Message<Null, string>{ Value = message}).GetAwaiter().GetResult();
            Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
        }
    }
    

    Looks like an instance of arbitrary class could be sent as (substitute MyClass with target class):

            var result = producer.ProduceAsync(kafkaTopic, new Message<Null, MyClass>{ Value = myObject}).GetAwaiter().GetResult();