I have installed Kafka on Ubuntu and managed to test a simple scenario just with shell files:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname
followed by:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopicname --from-beginning
All good, everything works. Now, I want to close the producer and write the code for producer in an asp.net core application.
using System;
using System.ComponentModel;
using System.Net;
using Confluent.Kafka;
namespace KafkaTraining
{
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
}
Console.ReadLine();
}
}
}
Inspiration found here: // https://docs.confluent.io/current/clients/dotnet.html#
So, the above C# code is supposed to be equivalent with the shell command, right, plus the message I wrote in C# that should be consumed by the consumer (the consumer is still open in a terminal window thanks to a shell command).
If the shell command I posted earlier works, namely bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname
with just that additional info related to localhost:9092 (address of service) and topic name, why doesn't the C# program successfully substitute that, it should produce the message "a log message" and the terminal should consume it, but does not. And how could I debug this?
PS. I have installed Kafka on Linux Ubuntu from this address: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz whereas in my Asp.Net Core 3.1 console app, I have installed as you can see version 1.5.0, not sure if that plays a part, but no clue on how to start debugging ... thansk for any pointers.
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<StartupObject>KafkaDemo.Program</StartupObject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.0" />
</ItemGroup>
</Project>
Your message is not being produced, because you dispose the producer before the message is delivered. Produce
method sends a message asynchronously and doesn't wait for the response - it returns immediately. To ensure that it is sent, you can use Flush()
- which blocks until all in-flight messages are delivered, or you can use await ProduceAsync()
.
Try this:
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
producer.Flush();
}