Search code examples
c#ubuntuapache-kafkapublish-subscribekafka-producer-api

Why is my ProducerBuilder class's message in .Net Core console not consumed by Kafka consumer shell?


I have installed Kafka on Ubuntu and managed to test a simple scenario just with shell files:

  • started zookeeper with the default config
  • started a node or a broker with the default config
  • created a topic giving it a name, with a single partition and replication factor of 1, and associating it with the zookeeper default address
  • opened a producer and a consumer:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname

followed by:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopicname --from-beginning

All good, everything works. Now, I want to close the producer and write the code for producer in an asp.net core application.

using System;
using System.ComponentModel;
using System.Net;
using Confluent.Kafka;

    namespace KafkaTraining
    {
        class Program
        {
            static void Main(string[] args)
            {
                var config = new ProducerConfig
                {
                    BootstrapServers = "localhost:9092"
                };
    
                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
                }
                Console.ReadLine();
            }
        }
    }

Inspiration found here: // https://docs.confluent.io/current/clients/dotnet.html#

So, the above C# code is supposed to be equivalent with the shell command, right, plus the message I wrote in C# that should be consumed by the consumer (the consumer is still open in a terminal window thanks to a shell command).

If the shell command I posted earlier works, namely bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname with just that additional info related to localhost:9092 (address of service) and topic name, why doesn't the C# program successfully substitute that, it should produce the message "a log message" and the terminal should consume it, but does not. And how could I debug this?

PS. I have installed Kafka on Linux Ubuntu from this address: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz whereas in my Asp.Net Core 3.1 console app, I have installed as you can see version 1.5.0, not sure if that plays a part, but no clue on how to start debugging ... thansk for any pointers.

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <StartupObject>KafkaDemo.Program</StartupObject>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Confluent.Kafka" Version="1.5.0" />
  </ItemGroup>

</Project>

Solution

  • Your message is not being produced, because you dispose the producer before the message is delivered. Produce method sends a message asynchronously and doesn't wait for the response - it returns immediately. To ensure that it is sent, you can use Flush() - which blocks until all in-flight messages are delivered, or you can use await ProduceAsync().

    Try this:

    using (var producer = new ProducerBuilder<string, string>(config).Build())
    {
        producer.Produce("mytopicname", new Message<string, string> { Value = "a log message" });
        producer.Flush();
    }