I'm trying to make a proof of concept, simple Kafka producer and consumer. The platform I'm using is .Net Core 2.0, built by Visual Studio 2017. The Nuget Package I'm using is Confluent.Kafka
I've found some code through research that has had the best success, but now it seems to have trouble when implementing a .ProduceAsync() method. No error codes appear, and the program seems to continue to run, but otherwise doesn't execute the method.
Here is my Producer:
class Producer
{
static void Main(string[] args)
{
Console.WriteLine("PRODUCER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the producer configuration
var producerConfig = new Dictionary<string, object> { { "bootstrap.servers", kafkaEndpoint } };
// Create the producer
using (var producer = new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)))
{
Console.WriteLine("Producer Created!");
// Send 10 messages to the topic
for (int i = 0; i < 10; i++)
{
var message = $"Event {i}";
Console.WriteLine($"Begin Event {i}");
var result = producer.ProduceAsync(kafkaTopic, null, message).GetAwaiter().GetResult();
Console.WriteLine($"Event {i} sent on Partition: {result.Partition} with Offset: {result.Offset}");
Console.WriteLine("Press any key to continue...");
Console.ReadLine();
}
}
}
}
And here is my Consumer:
class Consumer
{
static void Main(string[] args)
{
Console.WriteLine("CONSUMER!!!");
// The Kafka endpoint address
string kafkaEndpoint = "127.0.0.1:9092";
// The Kafka topic we'll be using
string kafkaTopic = "testtopic";
// Create the consumer configuration
var consumerConfig = new Dictionary<string, object>
{
{ "group.id", "myconsumer" },
{ "bootstrap.servers", kafkaEndpoint },
};
// Create the consumer
using (var consumer = new Consumer<Null, string>(consumerConfig, null, new StringDeserializer(Encoding.UTF8)))
{
// Subscribe to the OnMessage event
consumer.OnMessage += (obj, msg) =>
{
Console.WriteLine($"Received: {msg.Value}");
};
// Subscribe to the Kafka topic
consumer.Subscribe(new List<string>() { kafkaTopic });
// Handle Cancel Keypress
var cancelled = false;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
// Poll for messages
while (!cancelled)
{
consumer.Poll(3000);
//consumer.Poll();
}
}
}
}
As I run the code, the Consumer sits idle because it appears that no messages are produced, but here is my console output for the Producer:
PRODUCER!!!
Producer Created!
Begin Event 0
(And that's it. Console also sits idle at this point.)
The question is: How can this be fixed so that the messages are in fact produced, and so that the consumer can receive them?
You can force to get the result by changing the line in producer to:
var result = producer.ProduceAsync(kafkaTopic, null, message).Result;
Check the result contents to find what went wrong.