Search code examples
apache-kafkaspring-cloudspring-cloud-streamspring-kafka

How does Spring Kafka/Spring Cloud Stream guarantee the transactionality / atomicity involving a Database and Kafka?


Spring Kafka, and thus Spring Cloud Stream, allow us to create transactional Producers and Processors. We can see that functionality in action in one of the sample projects: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:

@Transactional
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public PersonEvent process(PersonEvent data) {
        logger.info("Received event={}", data);
        Person person = new Person();
        person.setName(data.getName());

        if(shouldFail.get()) {
            shouldFail.set(false);
            throw new RuntimeException("Simulated network error");
        } else {
            //We fail every other request as a test
            shouldFail.set(true);
        }
        logger.info("Saving person={}", person);

        Person savedPerson = repository.save(person);

        PersonEvent event = new PersonEvent();
        event.setName(savedPerson.getName());
        event.setType("PersonSaved");
        logger.info("Sent event={}", event);
        return event;
    }

In this excerpt, there's a read from a Kafka topic, a write in a database and another write to another Kafka topic, all of this transactionally.

What I wonder, and would like to have answered is how is that technically achieved and implemented.

Since the datasource and Kafka don't participate in a XA transaction (2 phase commit), how does the implementation guarantee that a local transaction can read from Kafka, commit to a database and write to Kafka all of this transactionally?


Solution

  • There is no guarantee, only within Kafka itself.

    Spring provides transaction synchronization so the commits are close together but it is possible for the DB to commit and the Kafka does not. So you have to deal with the possibility of duplicates.

    The correct way to do this, when using spring-kafka directly, is NOT with @Transactional but to use a ChainedKafkaTransactionManager in the listener container.

    See Transaction Synchronization.

    Also see Distributed transactions in Spring, with and without XA and the "Best Efforts 1PC pattern" for background.

    However, with Stream, there is no support for the chained transaction manager, so the @Transactional is required (with the DB transaction manager). This will provide similar results to chained tx manager, with the DB committing first, just before Kafka.