I have to write a method to commit/send all the Kafka producer records on success and rollback all the records on failures based on the Kafka related exception occurred using the latest version of Spring Boot (Preferred) or with Java configuration.
And I am planning to follow the same approach in this Spring Boot ref, but not able to find out an example code.
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
The NEW approach (Recently added)
public ProducerResult produceData(String topic, List<Data> data) {
data.forEach(
d -> {
final ProducerRecord<String, Data> record = createRecord(topic, d);
CompletableFuture<SendResult<Integer, Data>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex != null) {
//Need to roll back the transaction
return new ProducerResult(false);
}
});
}
);
//Need to commit the transaction
return new ProducerResult(true);
}
The method createRecord(topic, d)
returns new ProducerRecord<>(topic, d)
And then how to proceed with template.executeInTransaction?
You need to describe your scenario in more detail.
For producer-only transactions, just use the normal Spring @Transactional
annotation on the method and all sends within the method will participate in the transaction.
Or you can use KafkaTemplate.executeInTransaction()
; see https://docs.spring.io/spring-kafka/reference/html/#kafkatemplate-local-transactions
When the method exits, the transaction will be committed (or rolled back if an exception is thrown).
For consume->process->produce
scenarios, the situation is similar but, instead of using @Transactional
the listener container will start the transaction before calling the listener method.
EDIT
Here is an example of using local transactions; if the send, or getting the result of a future, throws an exception, the transaction will be rolled back; otherwise it will be committed.
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
List<String> data = List.of("foo", "bar");
return args -> {
boolean success = template.executeInTransaction(t -> {
List<CompletableFuture<SendResult<String, String>>> futures = new ArrayList<>();
data.forEach(item-> {
futures.add(t.send("so75910507", item));
});
futures.forEach(future -> {
try {
future.get(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
});
return true;
});
};
}