Spring Kafka, and thus Spring Cloud Stream, allow us to create transactional Producers and Processors. We can see that functionality in action in one of the sample projects: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:
@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
logger.info("Received event={}", data);
Person person = new Person();
person.setName(data.getName());
if(shouldFail.get()) {
shouldFail.set(false);
throw new RuntimeException("Simulated network error");
} else {
//We fail every other request as a test
shouldFail.set(true);
}
logger.info("Saving person={}", person);
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
}
In this excerpt, there's a read from a Kafka topic, a write in a database and another write to another Kafka topic, all of this transactionally.
What I wonder, and would like to have answered is how is that technically achieved and implemented.
Since the datasource and Kafka don't participate in a XA transaction (2 phase commit), how does the implementation guarantee that a local transaction can read from Kafka, commit to a database and write to Kafka all of this transactionally?
There is no guarantee, only within Kafka itself.
Spring provides transaction synchronization so the commits are close together but it is possible for the DB to commit and the Kafka does not. So you have to deal with the possibility of duplicates.
The correct way to do this, when using spring-kafka directly, is NOT with @Transactional
but to use a ChainedKafkaTransactionManager
in the listener container.
See Transaction Synchronization.
Also see Distributed transactions in Spring, with and without XA and the "Best Efforts 1PC pattern" for background.
However, with Stream, there is no support for the chained transaction manager, so the @Transactional
is required (with the DB transaction manager). This will provide similar results to chained tx manager, with the DB committing first, just before Kafka.