Search code examples
asp.netapache-kafka.net-coreproducer-consumer

.Net Core Kafka ProduceAsync doesn't complete


I'm trying to make a proof of concept, simple Kafka producer and consumer. The platform I'm using is .Net Core 2.0, built by Visual Studio 2017. The Nuget Package I'm using is Confluent.Kafka

I've found some code through research that has had the best success, but now it seems to have trouble when implementing a .ProduceAsync() method. No error codes appear, and the program seems to continue to run, but otherwise doesn't execute the method.

Here is my Producer:

class Producer
    {
        static void Main(string[] args)
        {
            Console.WriteLine("PRODUCER!!!");

            // The Kafka endpoint address
            string kafkaEndpoint = "127.0.0.1:9092";

            // The Kafka topic we'll be using
            string kafkaTopic = "testtopic";

            // Create the producer configuration
            var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };

            // Create the producer
            using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
            {
                Console.WriteLine("Producer Created!");
                // Send 10 messages to the topic
                for (int i = 0; i < 10; i++)
                {
                    var message = $"Event {i}";
                    Console.WriteLine($"Begin Event {i}");
                    var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
                    Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
                    Console.WriteLine("Press any key to continue...");
                    Console.ReadLine();
                }
            }
        }
    }

And here is my Consumer:

class Consumer
    {
        static void Main(string[] args)
        {
            Console.WriteLine("CONSUMER!!!");

            // The Kafka endpoint address
            string kafkaEndpoint = "127.0.0.1:9092";

            // The Kafka topic we'll be using
            string kafkaTopic = "testtopic";

            // Create the consumer configuration
            var consumerConfig = new Dictionary<string, object>
            {
                { "group.id", "myconsumer" },
                { "bootstrap.servers", kafkaEndpoint },
            };

            // Create the consumer
            using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
            {
                // Subscribe to the OnMessage event
                consumer.OnMessage += (obj, msg) =>
                {
                    Console.WriteLine($"Received: {msg.Value}");
                };

                // Subscribe to the Kafka topic
                consumer.Subscribe(new List<string>() { kafkaTopic });

                // Handle Cancel Keypress 
                var cancelled = false;
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true; // prevent the process from terminating.
                    cancelled = true;
                };

                Console.WriteLine("Ctrl-C to exit.");

                // Poll for messages
                while (!cancelled)
                {
                    consumer.Poll(3000);
                    //consumer.Poll();
                }
            }
        }
    }

As I run the code, the Consumer sits idle because it appears that no messages are produced, but here is my console output for the Producer:

PRODUCER!!!
Producer Created!
Begin Event 0

(And that's it. Console also sits idle at this point.)

The question is: How can this be fixed so that the messages are in fact produced, and so that the consumer can receive them?


Solution

  • You can force to get the result by changing the line in producer to:

    var result = producer.ProduceAsync(kafkaTopic, null, message).Result;
    

    Check the result contents to find what went wrong.