Search code examples
javaasynchronousspring-kafkaforkjoinpool

What is the max number of async threads created for kafkatemplate async response


"A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors."

Suppose my CPU has 2 cores. So, the number of max threads created by ForkJoinPool is 4?

Suppose I am performing an asynchronous operation which returns a future object in a loop (say 10k) operation which uses default Forkpool... then how many threads will be created by Forkpool?

List<ListenableFuture<SendResult<String, String>>> cf = new ArrayList<ListenableFuture<SendResult<String, String>>>();

future = kafkaTemplate.send(topicName, message);
cf.add(future);

i++;

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> result) {
        syso("sent success");
    }

    @Override
    public void onFailure(Throwable ex) {
        System.out.println(" sending failed");
    }
});

And, in some other class, I am checking if all future have completed or not:

    for (ListenableFuture<SendResult<String, String>> m : myFutures) {
        m.get();
    }

Solution

  • There is no additional threading; the futures are completed on the producer's I/O thread.

    Here is a test that shows the callbacks...

    @SpringBootApplication
    public class So61415751Application {
    
    
        private static final Logger LOG = LoggerFactory.getLogger(So61415751Application.class);
    
    
        public static void main(String[] args) {
            SpringApplication.run(So61415751Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            template.setProducerListener(new ProducerListener<String, String>() {
                @Override
                public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
                    LOG.info(recordMetadata.toString());
                }
            });
            return args -> {
                IntStream.range(0, 9).forEach(i -> template.send("so61415751", "foo" + i));
                LOG.info("Sent");
                Thread.sleep(10_000);
            };
        }
    
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so61415751").partitions(1).replicas(1).build();
        }
    
    }
    
    spring.kafka.producer.properties.linger.ms=3000
    
    #logging.level.org.springframework.kafka=debug
    
    logging.level.org.apache.kafka=debug
    

    result

    2020-04-24 17:27:46.282  INFO 96084 --- [           main] com.example.demo.So61415751Application   : Sent
    
    ...
    
    3 second linger
    
    ...
    
    2020-04-24 17:27:49.299  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@63
    2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@64
    2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@65
    2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@66
    2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@67
    2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@68
    2020-04-24 17:27:49.300  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@69
    2020-04-24 17:27:49.301  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@70
    2020-04-24 17:27:49.301  INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application   : so61415751-0@71
    

    (The thread the calls the ProducerListener also completes the future).