Search code examples
javaspringapache-kafkaspring-kafka

kafka async message producer: retry does not work


I am learning how to use Kafka with the Spring for Apache Kafka. My focus at the moment is only on the producer.

On the server side, I have two Kafka nodes running on my local machine, Kafka version: 3.8.1.

On the client side I use spring-kafka v3.3.1. I use JsonSerializer.class for the value and non-blocking (async) sender as I do not want to block the producer's work until it receives the acks. I process (just logging, nothing else) the result in the future.whenComplete(...) method as it is written in the official doc.

My message-producer that I created works fine, I am able to send messages to the Kafka topic and I see them correctly with a kafka topic-browser tool (KafkIO). I configured the producer retries based on several docs and articles, for example this doc. But unfortunately I cannot see anything in the log related to the retry and I think Spring/Kafka does not do retry in case of failure.

My test flow is the following:

  1. Start my Kafka servers
  2. Start my spring app with the message producer
  3. Send a few messages to the topic, it works like a charm, messages appears on the topic
  4. I stop my running Kafka servers (booth) on my localhost.
  5. Trying to send a new message to the topic
  6. I expect Spring does retry but retry does not work

I see the exception thrown once in the log BUT I cannot see any retry:

org.springframework.kafka.core.KafkaProducerException: Failed to send
...
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation
...

What I miss in my code? Why the retry in case on Error does not work?

This is my producer config:

 private Map<String, Object> producerConfiguration() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);

    
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    configs.put(ProducerConfig.ACKS_CONFIG, "all");
    configs.put(ProducerConfig.RETRIES_CONFIG, "2147483647");
    configs.put(ProducerConfig.LINGER_MS_CONFIG, "0");
    configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "15000");
    configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
    configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
    configs.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, "5000");
    return configs;
}

Factory and KafkaTemplate:

@Bean
public ProducerFactory<String, Event> producerFactory() {
    DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(producerConfiguration());
    factory.setProducerPerThread(false);
    return factory;
}

@Bean
public KafkaTemplate<String, Event> kafkaTemplate() {
    var factory = producerFactory();
    log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
    return new KafkaTemplate<>(factory);
}

This is how I create the topic:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic() {
    return TopicBuilder.name("incoming")
            .partitions(3)
            .replicas(2)
            .build();
}

And finally the way I send the message:

public void onSend(Event event) {
    try {
        log.debug("sending message to kafka: {topic: \"{}\", payload: {}}", kafkaTopic, event);
        ProducerRecord<String, Event> record = new ProducerRecord<>(kafkaTopic, event);
        kafkaTemplate.send(record).
                whenComplete((result, ex) -> {
                    if (ex == null) {
                        log.info(
                                "message successfully sent to kafka: {topic: \"{}\", partition: {}, offset: {}, key: \"{}\", value: \"{}\"}",
                                result.getRecordMetadata().topic(),
                                result.getRecordMetadata().partition(),
                                result.getRecordMetadata().offset(),
                                result.getProducerRecord().key(),
                                result.getProducerRecord().value());
                    } else {
                        var cause = ex.getCause();
                        var isRetryable = cause instanceof RetriableException;
                        log.error(
                                "failed to send message to kafka: {topic: \"{}\", event: {}, error: {}, isRetryable: {}}",
                                kafkaTopic,
                                event.toString(),
                                cause.getMessage(),
                                isRetryable,
                                ex);
                    }});
    } catch (Throwable ex) {
        log.error(
                "Unable to send message to kafka topic: {topic: \"{}\", event: {}}",
                kafkaTopic,
                event.toString(),
                ex);
    }
}

And the result log:

2025-01-31T13:19:52.183Z  INFO 203 --- [gombi-kafka-producer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8443 (https)
2025-01-31T13:19:52.209Z  INFO 203 --- [gombi-kafka-producer] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2025-01-31T13:19:52.210Z  INFO 203 --- [gombi-kafka-producer] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.34]
2025-01-31T13:19:52.271Z  INFO 203 --- [gombi-kafka-producer] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2025-01-31T13:19:52.272Z  INFO 203 --- [gombi-kafka-producer] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 3738 ms
2025-01-31T13:19:53.048Z DEBUG 203 --- [gombi-kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : initializing a KafkaTemplate using the following setting: {"retries": "2147483647", "enable.idempotence": "true", "retry.backoff.max.ms": "5000", "value.serializer": "class org.springframework.kafka.support.serializer.JsonSerializer", "request.timeout.ms": "10000", "acks": "all", "bootstrap.servers": "kafka-1.hello.com:9092, kafka-2.hello.com:9092", "delivery.timeout.ms": "15000", "retry.backoff.ms": "1000", "key.serializer": "class org.apache.kafka.common.serialization.StringSerializer", "linger.ms": "0"}


<< creating the topic>>

2025-01-31T13:19:53.181Z DEBUG 203 --- [gombi-kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : creating a new kafka topic: {name: "incoming", partitions: 3, replicas: 2}
2025-01-31T13:19:55.083Z  WARN 203 --- [gombi-kafka-producer] [           main] iguration$LoadBalancerCaffeineWarnLogger : Spring Cloud LoadBalancer is currently working with the default cache. While this cache implementation is useful for development and tests, it's recommended to use Caffeine cache in production.You can switch to using Caffeine cache, by adding it and org.springframework.cache.caffeine.CaffeineCacheManager to the classpath.
2025-01-31T13:19:55.103Z  INFO 203 --- [gombi-kafka-producer] [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 3 endpoints beneath base path '/actuator'
2025-01-31T13:19:55.162Z  INFO 203 --- [gombi-kafka-producer] [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
auto.include.jmx.reporter = true
bootstrap.controllers = []
bootstrap.servers = [kafka-1.hello.com:9092, kafka-2.hello.com:9092]
client.dns.lookup = use_all_dns_ips
client.id = gombi-kafka-producer-admin-0
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
enable.metrics.push = true
metadata.max.age.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 100
...
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
...

2025-01-31T13:19:55.306Z  INFO 203 --- [gombi-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-01-31T13:19:55.306Z  INFO 203 --- [gombi-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-01-31T13:19:55.307Z  INFO 203 --- [gombi-kafka-producer] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1738329595305
2025-01-31T13:19:55.637Z  WARN 203 --- [gombi-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient     : [AdminClient clientId=gombi-kafka-producer-admin-0] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics.
2025-01-31T13:19:55.924Z  INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for gombi-kafka-producer-admin-0 unregistered
2025-01-31T13:19:55.929Z  INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2025-01-31T13:19:55.930Z  INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2025-01-31T13:19:55.931Z  INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2025-01-31T13:19:56.088Z  INFO 203 --- [gombi-kafka-producer] [           main] o.a.t.util.net.NioEndpoint.certificate   : Connector [https-jsse-nio-8443], TLS virtual host [_default_], certificate type [UNDEFINED] configured from keystore [/root/.keystore] using alias [kafka-producer-service-1.hello.com] with trust store [null]
2025-01-31T13:19:56.103Z  INFO 203 --- [gombi-kafka-producer] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8443 (https) with context path '/'
2025-01-31T13:19:56.112Z  INFO 203 --- [gombi-kafka-producer] [           main] o.s.c.c.s.ConsulServiceRegistry          : Registering service with consul: NewService{id='gombi-kafka-producer', name='gombi-kafka-producer', tags=[service, 0.2.0], address='kafka-producer-service-1.hello.com', meta={secure=true}, port=8443, enableTagOverride=null, check=Check{script='null', dockerContainerID='null', shell='null', interval='2s', ttl='null', http='https://kafka-producer-service-1.hello.com:8443/actuator/health', method='null', header={}, tcp='null', timeout='2s', deregisterCriticalServiceAfter='10s', tlsSkipVerify=null, status='null', grpc='null', grpcUseTLS=null}, checks=null}
2025-01-31T13:19:56.155Z  INFO 203 --- [gombi-kafka-producer] [           main] c.r.g.s.message.producer.Application     : Started Application in 13.263 seconds (process running for 14.104)


<< sending the 1st message >>

2025-01-31T13:20:10.047Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-2] c.r.g.c.m.MethodStatisticsAspect         : > calling the KafkaProducerController.sendOneMessage()...
2025-01-31T13:20:10.047Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-2] c.r.g.s.m.p.s.KafkaProducerService       : sending message to kafka: {topic: "incoming", payload: {sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:10.047"}
2025-01-31T13:20:10.056Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [kafka-1.hello.com:9092, kafka-2.hello.com:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = gombi-kafka-producer-producer-1
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
connections.max.idle.ms = 540000
delivery.timeout.ms = 15000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
...
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 10000
retries = 2147483647
retry.backoff.max.ms = 5000
retry.backoff.ms = 1000
...
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
...
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2025-01-31T13:20:10.077Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
2025-01-31T13:20:10.087Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=gombi-kafka-producer-producer-1] Instantiated an idempotent producer.
2025-01-31T13:20:10.103Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.8.1
2025-01-31T13:20:10.104Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 70d6ff42debf7e17
2025-01-31T13:20:10.104Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1738329610103
2025-01-31T13:20:10.104Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] c.r.g.s.m.p.c.KafkaConfiguration         : ProducerFactory > PostProcessor > apply
2025-01-31T13:20:10.105Z  INFO 203 --- [gombi-kafka-producer] [nio-8443-exec-2] c.r.g.s.m.p.c.KafkaConfiguration         : ProducerFactory > listener: message producer has been added, id: producerFactory.gombi-kafka-producer-producer-1
2025-01-31T13:20:10.116Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=gombi-kafka-producer-producer-1] Cluster ID: h9gtq9ZHTDyTctZ0WyukDA
2025-01-31T13:20:10.140Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-2] c.r.g.c.m.MethodStatisticsAspect         : < call ended: {name: KafkaProducerController.sendOneMessage, return: "A message has been sent to the <b>incoming</b> Kafka topic.", execution-in-ms: 90}
2025-01-31T13:20:12.127Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=gombi-kafka-producer-producer-1] ProducerId set to 1000 with epoch 0
2025-01-31T13:20:12.189Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] c.r.g.s.m.p.s.KafkaProducerService       : message successfully sent to kafka: {topic: "incoming", partition: 2, offset: 0, key: "null", value: "{sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:10.047""}


<< send another messages >>

2025-01-31T13:20:13.302Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.c.m.MethodStatisticsAspect         : > calling the KafkaProducerController.sendOneMessage()...
2025-01-31T13:20:13.302Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.s.m.p.s.KafkaProducerService       : sending message to kafka: {topic: "incoming", payload: {sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:13.302"}
2025-01-31T13:20:13.303Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.c.m.MethodStatisticsAspect         : < call ended: {name: KafkaProducerController.sendOneMessage, return: "A message has been sent to the <b>incoming</b> Kafka topic.", execution-in-ms: 1}
2025-01-31T13:20:13.311Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] c.r.g.s.m.p.s.KafkaProducerService       : message successfully sent to kafka: {topic: "incoming", partition: 2, offset: 1, key: "null", value: "{sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:13.302""}

2025-01-31T13:20:13.821Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-9] c.r.g.c.m.MethodStatisticsAspect         : > calling the KafkaProducerController.sendOneMessage()...
2025-01-31T13:20:13.822Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-9] c.r.g.s.m.p.s.KafkaProducerService       : sending message to kafka: {topic: "incoming", payload: {sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:13.822"}
2025-01-31T13:20:13.823Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-9] c.r.g.c.m.MethodStatisticsAspect         : < call ended: {name: KafkaProducerController.sendOneMessage, return: "A message has been sent to the <b>incoming</b> Kafka topic.", execution-in-ms: 1}


<< stopping kafka servers >>

2025-01-31T13:20:19.350Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Node -1 disconnected.
2025-01-31T13:20:31.718Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Node 2 disconnected.
2025-01-31T13:20:31.720Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Node -2 disconnected.


<< sending a new message, expecting to see retry here >>

2025-01-31T13:20:36.805Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.c.m.MethodStatisticsAspect         : > calling the KafkaProducerController.sendOneMessage()...
2025-01-31T13:20:36.805Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.s.m.p.s.KafkaProducerService       : sending message to kafka: {topic: "incoming", payload: {sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:36.805"}
2025-01-31T13:20:36.806Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.c.m.MethodStatisticsAspect         : < call ended: {name: KafkaProducerController.sendOneMessage, return: "A message has been sent to the <b>incoming</b> Kafka topic.", execution-in-ms: 1}
2025-01-31T13:20:41.043Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 9314 ms.
2025-01-31T13:20:46.218Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 9375 ms.
2025-01-31T13:20:51.808Z ERROR 203 --- [gombi-kafka-producer] [ucer-producer-1] c.r.g.s.m.p.s.KafkaProducerService       : failed to send message to kafka: {topic: "incoming", event: {sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:36.805", error: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation, isRetryable: true}

org.springframework.kafka.core.KafkaProducerException: Failed to send
at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$9(KafkaTemplate.java:891) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:1111) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1567) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:426) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:347) ~[kafka-clients-3.8.1.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) ~[kafka-clients-3.8.1.jar!/:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation

2025-01-31T13:20:51.811Z ERROR 203 --- [gombi-kafka-producer] [ucer-producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", create...' to topic incoming:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation


<< the following logs lines continues endlessly >>

2025-01-31T13:21:08.288Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 21930 ms.
2025-01-31T13:21:33.112Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 19813 ms.
2025-01-31T13:22:02.355Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 24229 ms.
2025-01-31T13:22:37.368Z  INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 30000 ms.
...

You can check the SRC here:


Solution

  • Resend (retry) works only if you have connection to the Broker and something happened during sending a message.

    So, if your Broker is dead, there is no any reason to send message at all - no connection. And that is an exception about.