Search code examples
c#pollylibrdkafkaconfluent-kafka-dotnet

How To Implement A Wait Retry Transient Fault Handling Policy?


I am fairly new to Kafka and Polly. I am seeking advice with respect to how to implement failure resiliency when using the Admin Client with Kakfa Confluent .NET client. I am using the Admin Client to create a topic if it does not already exist during startup of a Blazor Server Web App.

Initially, I am trying to use polly to implement a simple wait and retry policy, listed below. I am expecting this to retry a create topic operation for a configurable number of attempts. Between each retry attempt there is a short configurable wait delay. If all retry attempts have been exhausted then a fatal error is signalled and the application gracefully exits.

Wait and Retry Policy

public static AsyncRetryPolicy BrokerWaitAndRetry(short retryCount, short waitSeconds, ILogger logger)
{
    var pauseBetweenFailures = TimeSpan.FromSeconds(waitSeconds);

    var retryPolicy = Policy
        .Handle<KafkaException>()
        .WaitAndRetryAsync(retryCount, i => pauseBetweenFailures, onRetry: (Exception e, TimeSpan time) =>
        {
            logger.LogInformation($"Retry attempt {time}");
        });
    }
    return retryPolicy;
}

Using the Wait and Retry Policy With An Admin Client Instance To Create A Topic

var polly = KakaFailurePolicies.BrokerWaitAndRetry(5, 7, _Logger);
await polly.ExecuteAsync(async () =>
{
  await client.CreateTopicsAsync(
    new TopicSpecification[] {
      new TopicSpecification {
        Name = _Config.Topic.Name,
        NumPartitions = _Config.Topic.PartitionCount,
        ReplicationFactor = _Config.Topic.ReplicationCount
      }
    },
    new CreateTopicsOptions
    {
        /** Set low request timeout for basic testing, otherwise would use a higher timeout of 60 seconds*/
        RequestTimeout = TimeSpan.FromSeconds(20)
    }
  );
});

Logs and Analysis

When I try and run this I can see from the logs below that a retry is attempted once. However, no subsequent retries are attempted. The logs highlight that an identical error has been detected and suppressed by the rdkafka library.

I think this is the reason that there are no subsequent retry attempts shown in the logs, i.e. the underlying rdkafka library poll thread is hiding subsequent local timeout errors and continues to try and connect to the broker. Subsequently the .NET client is unable to throw the exception since it has not been notified of the failure. This means that polly is stuck awaiting and running the second attempt?

info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
      Starting async initialization
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
      Starting async initialization for WebApp.Kafka.Admin.KafkaAdminService
%3|1609337520.079|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 2ms in state CONNECT)
%3|1609337520.079|ERROR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down
%3|1609337520.082|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 2ms in state CONNECT)
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Admin service trying to create Kafka Topic...
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Topic::eventbus, ReplicationCount::1, PartitionCount::3
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Bootstrap Servers::localhost:9092
%3|1609337521.081|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1609337521.081|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1609337522.083|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1609337522.083|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1609337524.089|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337524.089|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337525.090|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337525.090|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337527.097|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337527.097|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337528.096|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337528.096|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337530.106|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337530.106|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337532.115|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337532.115|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337533.119|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337533.119|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337535.128|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337535.128|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337536.128|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337536.128|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337537.129|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337537.129|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337540.133|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337540.133|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
Exception thrown: 'Confluent.Kafka.KafkaException' in System.Private.CoreLib.dll
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Retry attempt 00:00:07
%3|1609337542.135|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337542.135|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337544.140|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337544.140|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337545.140|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337545.140|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337546.140|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337546.140|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337547.332|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337547.332|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337548.149|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337548.149|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337549.148|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337549.148|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337550.148|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337550.148|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337551.152|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1609337551.152|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1609337555.171|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337555.171|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337556.171|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337556.171|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337557.177|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337557.177|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337558.176|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337558.176|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337560.180|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337560.180|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337561.183|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337561.183|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337562.186|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337562.186|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337563.186|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337563.186|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337564.191|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337564.191|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337567.200|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337567.200|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337568.204|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337568.204|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337569.208|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337569.208|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337570.212|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1609337570.212|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1609337573.222|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337573.222|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337574.225|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337574.226|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337575.230|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337575.230|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337576.229|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337576.229|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337578.234|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337578.234|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337579.238|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337579.238|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337580.241|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337580.241|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337581.245|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1609337581.245|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%3|1609337583.254|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337583.254|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337584.259|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337584.259|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337585.264|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1609337585.264|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 1ms in state CONNECT)
%3|1609337586.267|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337586.267|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337587.267|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337587.267|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337588.267|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337588.267|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337589.268|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337589.268|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337591.278|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337591.278|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337592.282|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337592.282|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337593.286|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1609337593.286|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)

The error that is thrown by the Confluent Kafka .NET library is ErrorCode.Local_TimedOut during the first attempt. I think this corresponds to rdkafka:error local timeout?? Upon investigation I found the following AdminClient configuration properties relating to transient fault handling:

This led me to try Confluent Kafka's built in transient fault handling:

var adminClientBuilder = new AdminClientBuilder(
/** https://github.com/confluentinc/confluent-kafka-dotnet/blob/2b54dd2ab07f6eaa886d88c57143838627b2512f/src/Confluent.Kafka/Config_gen.cs#L826 **/
    new AdminClientConfig()
    {
        BootstrapServers = _Config.Consumer.BootstrapServers,
        Debug = "all",
        SocketTimeoutMs = 20000, // reduced from 60seconds for testing purposes
        SocketMaxFails = 5, // is this synonymous to 5 retries?
    }
 );

After waiting for a short while during startup, I then started the Kafka broker. The debug logs below show that rdkafka thread detects that the main broker is up and running. However, the Create Topics admin operation is not being performed.

Debug logs

Available for viewing on pastebin due to space restrictions.

Questions

Is it possible to configure rdkafka so that identical errors are not suppressed when trying to connect the AdminClient to the broker?

In the above example, once the rdkafka poll thread has eventually detected that the broker is up and running, why does the AdminClient create topics operation not complete?

How to use Confluent Kafka .NET with/without Polly to implement a retry and wait failure policy for a Kafka AdminClient that makes a request to create a topic on the broker?

Update

Create a small console application, available at pastebin.

If a new AdminClient is created upon each retry attempt and a broker is eventually started after a failure has been detected then it works.

However if the same AdminClient instance is reused for each retry attempt and the broker is eventually started after a failure is detected, then the program blocks. I think this is because rdkafka library is suppressing multiple local timeout errors for the client. It only notifies Confluent Kafka .NET of the initial failure detection.

Is there a better way, rather than creating a new AdminClient instance upon each retry attempt?


Solution

  • After filing an issue at the Confluent Kafka GitHub repository it looks as though the problem described in this question is due to a confirmed bug in the Confluent Kafka .NET library.

    Workaround is suggested by the libraries author here.

    Essentially until the bug is fixed, a new AdminClient instance has to be created for each retry attempt.