Search code examples
javaspring-bootspring-kafkacompletable-future

CompletableFuture with Kafka's callback methods?


We're building a small wrapper library around Kafka. We're using older version where there is no .whenComplete((response, throwable) -> {...}); when calling kafkaTemplate.send(...);.

Instead our library is doing following:

1 Defining a RetryTemplate bean:

@Bean
public RetryTemplate prodRetryTemplate() {    
   final RetryTemplate template = RetryTemplate.builder().maxAttempts(3)
         .retryOn(TopicAuthorizationException.class).exponentialBackoff(1000, 2, 2000).build();
   return template;
}

2 Defining a asynchronous CompletableFuture method:

@Override
@Async(EXECUTOR_SERVICE) //we had defined Executor bean, it is ommited here
public CompletableFuture<DtoResponse> asyncSend(final Integer key, final String topic, final String message)
      throws Exception {

   final ProducerRecord<K, V> producerRecord = buildRecord(key, topic, message);

   DtoRespons[] responseHolder = new DtoRespons[1];
   template.execute(ctx ->{

            final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
            future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

               @Override
               public void onSuccess(final SendResult<Integer, String> result) {
                  log.info("success...");
                  responseHolder[0] = DtoResponse.builder()
                        .error(null)
                        .success(true)
                        .msg("PUBLISHED")
                        .build();
               }

               @SneakyThrows
               @Override
               public void onFailure(final Throwable ex) {
                  log.error("error...)
                  throwError(producerRecord, eventName, ex); //do additional logging
               }
            });

      return null; //to satisfy execute method from RetryTemplate
   });

   return CompletableFuture.completedFuture(responseHolder[0]);
}

My question is straightforward, is it possible that once asyncSend is called that it returns null i.e. will line return CompletableFuture.completedFuture(null); be executed before onSuccess or onFailure are invoked?

That is, from client side:

public void myService(){  
  MyLib lib = new MyLib();
  CompletableFuture<DtoResponse> future = lib.sendAsync(key, topic, message);
  future.whenComplete((result, throwable) -> { 
     //is it possible that result and throwable both be null ??

    });
}

Update 1:

Based on professor's Gary comment. I am not sure if this is what you meant:

@Override
@Async(EXECUTOR_SERVICE)
public CompletableFuture<SendResult<Integer, String>> asyncSend(final Integer key, final String topic, final String message)
      throws Exception {
   final ProducerRecord<Integer, String> producerRecord = buildRecord(key, topic, message); //this can throw ex

   ListenableFuture<SendResult<Integer, String>> x = template.execute(ctx -> {
      final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);

      return future;
   });
   CompletableFuture<SendResult<Integer, String>> completable = x.completable();

   return completable;
}

Then the client code would have to check for throwable and for SendResult<Integer, String> within .whenComplete(). But this is not what I need. With SendResult<Integer, String> client code still cannot tell whether messages is sent or not. How to circumvent this? Please let me know if I've explained this in understandable way. Also, our library needs to do important logging in onFailure and onSuccess

Update 2:

After Gary's EDIT.

Thanks Gary for helping. However, at first this works, but problem occurs when exception as thrown. In that case whenComplete is not invoked, neither is exceptionally channel invoked. Let me explain.

So since we use secure Kafka in our .yaml file there are these properties:

 properties:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "OAUTHBEARER"
  sasl.login.callback.handler.class: "path.to.LoginCallBack"
  sasl.jaas.config: >-
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
      required
      oauth.token.endpoint.uri="sso-uri"
      oauth.ssl.endpoint.identification.algorithm=""
      oauth.ssl.truststore.type="JKS"
      oauth.client.id="kafka-client-id"
      oauth.client.secret="someSecret"
      oauth.username="username"
      oauth.password="password";

I had changed value oauth.client.id to dummy in order to provoke org.apache.kafka.common.KafkaException: Failed to construct kafka producer and to see where the flow will end up.

When line kafkaTemplate.send("so75823718", "foo"); is reached, it throws exception and that exception is not ex in whenComplete

    @Bean
ApplicationRunner runner(Foo foo) {
  return args -> {
        try {
            CompletableFuture<SendResult<String, String>> future 
             = foo.asyncSend("foo");
            future.whenComplete((sr, ex) -> {
                if (sr != null){
                    System.out.println("success");
                } else if (ex != null){
                    System.out.println("fail");
                }
            });
        } catch (Exception e){
            System.out.println("not expected nor desired behavior 
            to have exception here");
            e.printStackTrace();
        }
    }
}

I've tried couple of things, but not worth of showing since they don't even compile. So my question is now, is it somehow possible to not force users of asyncSend to use try/catch instead just to use whenComplete() or acceptThen/exceptionally and that all error handling is done in asyncSend?

UPDATE 3:

Really hopping that is a final edit.

So I think I managed it like so:

First, template

@Value("${kafkaExceptions:}")
private List<String> exceptions = new ArrayList<>();

    @Bean
    public RetryTemplate template() throws ClassNotFoundException {
       exceptions.add(TopicAuthorizationException.class.getName());
       RetryTemplateBuilder retryTemplateBuilder = RetryTemplate.builder().maxAttempts(retries).retryOn(TopicAuthorizationException.class);
       for (String exceptionClassName : exceptions){
          retryTemplateBuilder.retryOn((Class<? extends Throwable>) Class.forName(exceptionClassName));
       }
       return retryTemplateBuilder.exponentialBackoff(1000, 2, 2000).build();
    }

Then,

@Async("exec")
public CompletableFuture<SendResult<K, V>> asyncSend(K key, String topic, V message, S) throws Exception {

   return template.execute(ctx -> {

      try {
         ProducerRecord<K, V> producerRecord = buildHeader(key, topic, msg); //can throw custom exception

         return kafkaTemplate.send(producerRecord).completable();
      } catch (Exception e){
         
         if (reThrow(e)) throw e;

         return CompletableFuture.failedFuture(new CustomException(Enum.KAFKA_EX_PROD_PUBLISH_ERROR.findErrorMessage(),e));

      }
   });

}


private boolean reThrow(Exception e){
   String retriableException = e.getClass().getName();
   return retriableExceptions.contains(retriableException);
}

In this way I control execute invocations. I just need two confirmations.

  1. Can I be sure that if in .whenComplete, sr is non-null, that means that is equivalent of onSuccess (from ListenableFutureCallback) callback was called? There will be some business logic if sr is non-null.

  2. I can be sure that if in .whenComplete, th is non-null, that happened either because onFailure (from ListenableFutureCallback) happened or I completed future with failure?


Solution

  • completedFuture

    You are returning an already completed future, regardless of whether the callback has run yet; so the caller will always see a completed future.

    You should return future from the execute method, then return future.completable() from the asyncSend.

    Note that, starting with version 3.0, the template returns a CompletableFuture instead of a ListenableFuture.

    EDIT

    I just tested it and it works as expected...

    @SpringBootApplication
    @EnableAsync
    public class So75823718Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So75823718Application.class, args);
        }
    
        @Bean
        NewTopic topic() {
            return TopicBuilder.name("so75823718").partitions(1).replicas(1).build();
        }
    
        @Bean
        ApplicationRunner runner(Foo foo) {
            return args -> {
                CompletableFuture<SendResult<String, String>> future = foo.asyncSend("foo");
                future.whenComplete((sr, ex) -> System.out.println(future + ": " + sr.getRecordMetadata()));
            };
        }
    
        @Bean
        TaskExecutor exec() {
            return new SimpleAsyncTaskExecutor();
        }
    
    }
    
    @Component
    class Foo {
    
        RetryTemplate template = new RetryTemplate();
    
        @Autowired
        KafkaTemplate<String, String> kafkaTemplate;
    
        @Async("exec")
        public CompletableFuture<SendResult<String, String>> asyncSend(final String message) throws Exception {
    
            return this.template.execute(ctx -> {
                final ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("so75823718", "foo");
    
                return future;
            }).completable();
        }
    
    }
    
    java.util.concurrent.CompletableFuture@7ebce1f7[Completed normally]: so75823718-0@12
    

    EDIT2

    ...and to catch direct exceptions on the send()...

    public CompletableFuture<SendResult<String, String>> asyncSend(final String message) throws Exception {
    
        CompletableFuture<SendResult<String, String>> future = new CompletableFuture<>();
        try {
            CompletableFuture<SendResult<String, String>> fut = this.template.execute(ctx -> {
                return kafkaTemplate.send("so75823718", "foo");
            }).completable();
            fut.whenComplete((sr, ex) -> {
                if (ex != null) {
                    future.completeExceptionally(ex);
                }
                else {
                    future.complete(sr);
                }
            });
        }
        catch (Exception ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }