Search code examples
apache-kafkakafka-consumer-apilibrdkafkaconfluent-kafka-dotnet

Simple Confluent Kafka .net client fails to connect to simple build of AK Kafka broker


Any hints on how to get a simple .Net Kafka client to connect to a broker?

I built Apache Kafka from the 3.4.0 download and followed the quick start successfully (topic created, events produced, events consumed).

Using Confluent.Kafka v2.1.1 I tried to read the topic metadata with:

var adminConfig = new AdminClientConfig();
adminConfig.BootstrapServers = "localhost:9092";
adminConfig.Debug = "all";

using var adminClient = new AdminClientBuilder(adminConfig).Build();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
var topicsMetadata = metadata.Topics;
var topicNames = metadata.Topics.Select(a => a.Topic).ToList();

Console.WriteLine($"Topics: {string.Join(',', topicNames)}");

The var metadata = ... line throws Confluent.Kafka.KafkaException 'Local: Broker transport failure'

The logging in the terminal shows the following which keeps looping:

- xxx: Received CONNECT op
- xxx: Broker changed state INIT -> TRY_CONNECT
- xxx: Broadcasting state change
- xxx: broker in state TRY_CONNECT connecting
- xxx: Broker changed state TRY_CONNECT -> CONNECT
- xxx: Broadcasting state change
- xxx: Connecting to ipv6#[::1]:9092 (plaintext) with socket 1308
- xxx: Connected to ipv6#[::1]:9092
- xxx: localhost:9092/bootstrap: Connected (#12)
- xxx: localhost:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
- xxx: Broker changed state CONNECT -> APIVERSION_QUERY
- xxx: Broadcasting state change
- xxx: localhost:9092/bootstrap: Sent ApiVersionRequest (v3, 52 bytes @ 0, CorrId 12)
- xxx: Cluster connection already in progress: no cluster connection
- xxx: Cluster connection already in progress: no cluster connection
- xxx: Cluster connection already in progress: no cluster connection
- xxx: Cluster connection already in progress: no cluster connection
- xxx: Cluster connection already in progress: no cluster connection
- xxx: Cluster connection already in progress: no cluster connection
- xxx: Timed out ApiVersionRequest in flight (after 10115ms, timeout #0)
- xxx: ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration) (after 10116ms in state APIVERSION_QUERY) (_TRANSPORT): identical to last error: error log suppressed
- xxx: Updated enabled protocol features -ApiVersion to
- xxx: localhost:9092/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
- xxx: Broadcasting state change
- xxx: Purging bufq with 0 buffers
- xxx: Purging bufq with 0 buffers
- xxx: Updating 0 buffers on connection reset
- xxx: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
- xxx: Broker changed state DOWN -> INIT
- xxx: Broadcasting state change
- xxx: Selected for cluster connection: no cluster connection (broker has 11 connection attempt(s))

Solution

  • The error message turned out to be misleading, a connection was NOT successfully made to the broker.

    From this article, I discovered that the localhost address does not always correctly map between the WSL and Windows system.

    I used akhq successfully running within WSL, from Windows as localhost (http://localhost:8080/ui/local/topic) - though the kafka broker did not work as localhost (BootstrapServers = "localhost:9092" generates the error, though BootstrapServers = "172.18.212.44:9092" succeeds).

    If anyone in the Kafka community sees this - perhaps the logging could be improved to help identify the issue. Specifically, the log line "- xxx: Broker changed state CONNECT -> APIVERSION_QUERY" seems to indicate the connection was successful, and the query for api version is what was failing - which is not the case.