We're building a small wrapper library around Kafka. We're using older version where there is no .whenComplete((response, throwable) -> {...});
when calling kafkaTemplate.send(...);
.
Instead our library is doing following:
1 Defining a RetryTemplate
bean:
@Bean
public RetryTemplate prodRetryTemplate() {
final RetryTemplate template = RetryTemplate.builder().maxAttempts(3)
.retryOn(TopicAuthorizationException.class).exponentialBackoff(1000, 2, 2000).build();
return template;
}
2 Defining a asynchronous CompletableFuture method:
@Override
@Async(EXECUTOR_SERVICE) //we had defined Executor bean, it is ommited here
public CompletableFuture<DtoResponse> asyncSend(final Integer key, final String topic, final String message)
throws Exception {
final ProducerRecord<K, V> producerRecord = buildRecord(key, topic, message);
DtoRespons[] responseHolder = new DtoRespons[1];
template.execute(ctx ->{
final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(final SendResult<Integer, String> result) {
log.info("success...");
responseHolder[0] = DtoResponse.builder()
.error(null)
.success(true)
.msg("PUBLISHED")
.build();
}
@SneakyThrows
@Override
public void onFailure(final Throwable ex) {
log.error("error...)
throwError(producerRecord, eventName, ex); //do additional logging
}
});
return null; //to satisfy execute method from RetryTemplate
});
return CompletableFuture.completedFuture(responseHolder[0]);
}
My question is straightforward, is it possible that once asyncSend
is called that it returns null
i.e. will line return CompletableFuture.completedFuture(null);
be executed before onSuccess
or onFailure
are invoked?
That is, from client side:
public void myService(){
MyLib lib = new MyLib();
CompletableFuture<DtoResponse> future = lib.sendAsync(key, topic, message);
future.whenComplete((result, throwable) -> {
//is it possible that result and throwable both be null ??
});
}
Update 1:
Based on professor's Gary comment. I am not sure if this is what you meant:
@Override
@Async(EXECUTOR_SERVICE)
public CompletableFuture<SendResult<Integer, String>> asyncSend(final Integer key, final String topic, final String message)
throws Exception {
final ProducerRecord<Integer, String> producerRecord = buildRecord(key, topic, message); //this can throw ex
ListenableFuture<SendResult<Integer, String>> x = template.execute(ctx -> {
final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
return future;
});
CompletableFuture<SendResult<Integer, String>> completable = x.completable();
return completable;
}
Then the client code would have to check for throwable
and for SendResult<Integer, String>
within .whenComplete()
. But this is not what I need. With SendResult<Integer, String>
client code still cannot tell whether messages is sent or not. How to circumvent this? Please let me know if I've explained this in understandable way. Also, our library needs to do important logging in onFailure
and onSuccess
Update 2:
After Gary's EDIT.
Thanks Gary for helping. However, at first this works, but problem occurs when exception as thrown. In that case whenComplete
is not invoked, neither is exceptionally
channel invoked. Let me explain.
So since we use secure Kafka in our .yaml
file there are these properties:
properties:
security.protocol: "SASL_SSL"
sasl.mechanism: "OAUTHBEARER"
sasl.login.callback.handler.class: "path.to.LoginCallBack"
sasl.jaas.config: >-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required
oauth.token.endpoint.uri="sso-uri"
oauth.ssl.endpoint.identification.algorithm=""
oauth.ssl.truststore.type="JKS"
oauth.client.id="kafka-client-id"
oauth.client.secret="someSecret"
oauth.username="username"
oauth.password="password";
I had changed value oauth.client.id
to dummy
in order to provoke org.apache.kafka.common.KafkaException: Failed to construct kafka producer
and to see where the flow will end up.
When line kafkaTemplate.send("so75823718", "foo");
is reached, it throws exception and that exception is not ex
in whenComplete
@Bean
ApplicationRunner runner(Foo foo) {
return args -> {
try {
CompletableFuture<SendResult<String, String>> future
= foo.asyncSend("foo");
future.whenComplete((sr, ex) -> {
if (sr != null){
System.out.println("success");
} else if (ex != null){
System.out.println("fail");
}
});
} catch (Exception e){
System.out.println("not expected nor desired behavior
to have exception here");
e.printStackTrace();
}
}
}
I've tried couple of things, but not worth of showing since they don't even compile. So my question is now, is it somehow possible to not force users of asyncSend
to use try/catch instead just to use whenComplete()
or acceptThen/exceptionally
and that all error handling is done in asyncSend
?
UPDATE 3:
Really hopping that is a final edit.
So I think I managed it like so:
First, template
@Value("${kafkaExceptions:}")
private List<String> exceptions = new ArrayList<>();
@Bean
public RetryTemplate template() throws ClassNotFoundException {
exceptions.add(TopicAuthorizationException.class.getName());
RetryTemplateBuilder retryTemplateBuilder = RetryTemplate.builder().maxAttempts(retries).retryOn(TopicAuthorizationException.class);
for (String exceptionClassName : exceptions){
retryTemplateBuilder.retryOn((Class<? extends Throwable>) Class.forName(exceptionClassName));
}
return retryTemplateBuilder.exponentialBackoff(1000, 2, 2000).build();
}
Then,
@Async("exec")
public CompletableFuture<SendResult<K, V>> asyncSend(K key, String topic, V message, S) throws Exception {
return template.execute(ctx -> {
try {
ProducerRecord<K, V> producerRecord = buildHeader(key, topic, msg); //can throw custom exception
return kafkaTemplate.send(producerRecord).completable();
} catch (Exception e){
if (reThrow(e)) throw e;
return CompletableFuture.failedFuture(new CustomException(Enum.KAFKA_EX_PROD_PUBLISH_ERROR.findErrorMessage(),e));
}
});
}
private boolean reThrow(Exception e){
String retriableException = e.getClass().getName();
return retriableExceptions.contains(retriableException);
}
In this way I control execute
invocations. I just need two confirmations.
Can I be sure that if in .whenComplete
, sr
is non-null, that means that is equivalent of onSuccess
(from ListenableFutureCallback) callback was called? There will be some business logic if sr
is non-null.
I can be sure that if in .whenComplete
, th
is non-null, that happened either because onFailure
(from ListenableFutureCallback) happened or I completed future with failure?
completedFuture
You are returning an already completed future, regardless of whether the callback has run yet; so the caller will always see a completed future.
You should return future
from the execute method, then return future.completable()
from the asyncSend
.
Note that, starting with version 3.0, the template returns a CompletableFuture
instead of a ListenableFuture
.
EDIT
I just tested it and it works as expected...
@SpringBootApplication
@EnableAsync
public class So75823718Application {
public static void main(String[] args) {
SpringApplication.run(So75823718Application.class, args);
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so75823718").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(Foo foo) {
return args -> {
CompletableFuture<SendResult<String, String>> future = foo.asyncSend("foo");
future.whenComplete((sr, ex) -> System.out.println(future + ": " + sr.getRecordMetadata()));
};
}
@Bean
TaskExecutor exec() {
return new SimpleAsyncTaskExecutor();
}
}
@Component
class Foo {
RetryTemplate template = new RetryTemplate();
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Async("exec")
public CompletableFuture<SendResult<String, String>> asyncSend(final String message) throws Exception {
return this.template.execute(ctx -> {
final ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("so75823718", "foo");
return future;
}).completable();
}
}
java.util.concurrent.CompletableFuture@7ebce1f7[Completed normally]: so75823718-0@12
EDIT2
...and to catch direct exceptions on the send()
...
public CompletableFuture<SendResult<String, String>> asyncSend(final String message) throws Exception {
CompletableFuture<SendResult<String, String>> future = new CompletableFuture<>();
try {
CompletableFuture<SendResult<String, String>> fut = this.template.execute(ctx -> {
return kafkaTemplate.send("so75823718", "foo");
}).completable();
fut.whenComplete((sr, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
else {
future.complete(sr);
}
});
}
catch (Exception ex) {
future.completeExceptionally(ex);
}
return future;
}