Search code examples
javaspring-bootapache-kafkaspring-kafka

How to implement Kafka Transactions with Transaction Manager in Spring for Apache Kafka


I have to write a method to commit/send all the Kafka producer records on success and rollback all the records on failures based on the Kafka related exception occurred using the latest version of Spring Boot (Preferred) or with Java configuration.

And I am planning to follow the same approach in this Spring Boot ref, but not able to find out an example code.

public interface ProducerListener<K, V> {

void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
        Exception exception);

}

The NEW approach (Recently added)

public ProducerResult produceData(String topic, List<Data> data) {

    data.forEach(
            d -> {
                final ProducerRecord<String, Data> record = createRecord(topic, d);
                CompletableFuture<SendResult<Integer, Data>> future = template.send(record);

                future.whenComplete((result, ex) -> {
                    if (ex != null) {
                        //Need to roll back the transaction
                        return new ProducerResult(false);
                    }
                });
            }
    );
    //Need to commit the transaction
    return new ProducerResult(true);
}

The method createRecord(topic, d) returns new ProducerRecord<>(topic, d) And then how to proceed with template.executeInTransaction?


Solution

  • You need to describe your scenario in more detail.

    For producer-only transactions, just use the normal Spring @Transactional annotation on the method and all sends within the method will participate in the transaction.

    Or you can use KafkaTemplate.executeInTransaction(); see https://docs.spring.io/spring-kafka/reference/html/#kafkatemplate-local-transactions

    When the method exits, the transaction will be committed (or rolled back if an exception is thrown).

    For consume->process->produce scenarios, the situation is similar but, instead of using @Transactional the listener container will start the transaction before calling the listener method.

    EDIT

    Here is an example of using local transactions; if the send, or getting the result of a future, throws an exception, the transaction will be rolled back; otherwise it will be committed.

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        List<String> data = List.of("foo", "bar");
        return args -> {
            boolean success = template.executeInTransaction(t -> {
                List<CompletableFuture<SendResult<String, String>>> futures = new ArrayList<>();
                data.forEach(item-> {
                    futures.add(t.send("so75910507", item));
                });
                futures.forEach(future -> {
                    try {
                        future.get(10, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException(e);
                    }
                    catch (ExecutionException | TimeoutException e) {
                        throw new IllegalStateException(e);
                    }
                });
                return true;
            });
        };
    }