Search code examples
javafuturespring-kafkakafka-producer-api

Wait for List of ListenAbleFuture returned by Kafka Send API


I have the List of ListenAbleFuture.I want to wait of this List of ListenableFuture<SendResult<Integer, String>> for atmost 15 minutes if they have not comepleted. How can i achieve it.

Currently i am doing this but this wait for 15 min for every ListenAbleFuture which is what i dont want.

 for (ListenableFuture<SendResult<Integer, String>> m : myFutureList) {

                    m.get(15, TimeUnit.MINUTES) ;
    }

ListenableFuture<SendResult<Integer, String>> is from import org.springframework.util.concurrent.ListenableFuture;

I have gone through Waiting on a list of Future but this solution is for completablefuture


Solution

  • Create a CountDownLatch, e.g. new CountDownLatch(50), add a listener to each listenable future and count the latch down in each one. You can use the same listener for all the futures rather than creating a new one each time.

    Then, after sending 50 records, use latch.await(10, TimeUnit.SECONDS). If it times out you can then iterate over your futures to figure out which one(s) are not complete.

    EDIT

    @Component
    class Sender {
    
        private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
    
        public void sendThem(KafkaTemplate<String, String> template, List<String> toSend) throws InterruptedException {
            List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
            CountDownLatch latch = new CountDownLatch(toSend.size());
            ListenableFutureCallback<SendResult<String, String>> callback =
                    new ListenableFutureCallback<SendResult<String, String>>() {
    
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    LOG.info(result.getRecordMetadata().toString());
                    latch.countDown();
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    ProducerRecord<?, ?> producerRecord = ((KafkaProducerException) ex).getProducerRecord();
                    LOG.error("Failed; " + producerRecord, ex);
                    latch.countDown();
                }
            };
            toSend.forEach(str -> {
                 ListenableFuture<SendResult<String, String>> future = template.send("so61490633", str);
                 future.addCallback(callback);
            });
            if (latch.await(10, TimeUnit.SECONDS)) {
                LOG.info("All sent ok");
            }
            else {
                for (int i = 0; i < toSend.size(); i++) {
                    if (!futures.get(i).isDone()) {
                        LOG.error("No send result for " + toSend.get(i));
                    }
                }
            }
        }
    
    }