Scenario/Use Case: I have a Spring Boot application using Spring for Kafka to send messages to Kafka topics. Upon completion of a specific event (triggered by http request) a new thread is created (via Spring @Async) which calls kafkatemplate.send() and has a callback for the ListenableFuture that it returns. The original thread which handled the http request returns a response to the calling client and is freed.
Normal Behavior: Under normal application load I've verified that the individual messages are all published to the topic as desired (application log entries upon callback success or failure as well as viewing the message in the topic on the kafka cluster). If I bring down all kafka brokers for 3-5 minutes and then bring the cluster back online the application's publisher immediately re-establishes connection to kafka and proceeds with publishing messages.
Problem Behavior: However, when performing load testing, if I bring down all kafka brokers for 3-5 minutes and then bring the cluster back online, the Spring application's publisher continues to show failures for all publish attempts. This continues for approximately 7 hours at which time the publisher is able to successfully re-establish communication with kafka again (usually this is preceeded by a broken pipe exception but not always).
Current Findings: While performing the load test, for approx. 10 minutes, I connected to the the application using JConsole and monitored the producer metrics exposed via kafka.producer. Within the first approx. 30 seconds of heavy load, buffer-available-bytes continues to decrease until it reaches 0 and stays at 0. waiting-threads remains between 6 and 10 (alternates everytime I hit refresh) and buffer-available-bytes remains at 0 for approx. 6.5 hours. After that buffer-available-bytes shows all of the originally allocated memory restored but kafka publish attempts continue failing for approx. another 30 minutes before finally the desired behavior restores.
Current Producer Configuration
request.timeout.ms=3000
max.retry.count=2
max.inflight.requests=1
max.block.ms=10000
retry.backoff.ms=3000
All other properties are using their default values
Questions:
Update: To try and clarify thread usage. I'm using the single producer instance as recommended in the JavaDocs. There are threads such as https-jsse-nio-22443-exec-* which are handling incoming https requests. When a request comes in some processing occurs and once all non-kafka related logic completes a call is made to a method in another class decorated with @Async. This method makes the call to kafkatemplate.send(). The response back to the client is shown in the logs before the publish to kafka is performed (this is how Im verifying its being performed via separate thread as the service doesn't wait to publish before returning a response). There are task-scheduler-* threads which appear to be handling the callbacks from kafkatemplate.send(). My guess is that the single kafka-producer-network-thread handles all of the publishing.
My application was making an http request and sending each message to a deadletter table on a database platform upon failure of each kafka publish. The same threads being spun up to perform the publish to kafka were being re-used for this call to the database. I moved the database call logic into another class and decorated it with its own @Async and custom TaskExecutor. After doing this, I've monitored JConsole and can see that the calls to Kafka appear to be re-using the same 10 threads (TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80)
and the calls to the database service are now using a separate thread pool (TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80)
which is consistently closing and opening new threads but staying at a relatively constant number of threads. With this new behavior buffer-available-bytes is remaining at a healthy constant level and the application's kafka publisher is successfully re-establishing connection once brokers are brought back online.