I am using Spring Boot 2.5.2 and Spring Cloud 2020.0.3. I am attempting to wrap a rest service call which saves a record to a DB using JPA (CrudRepository.save) and then uses StreamBridge to post a message to a Kafka topic using spring-cloud-stream (kafka binder). I have tried several things, but nothing seems to work quite right. I am intentionally causing a JPA issue (inserting a row that would violate a unique key constraint), but the Kafka message seems to still go out to the broker.
What is the proper way to configure this type of a flow such that writes to both the db and broker are atomic?
HTTP -> JPA save -> Kafka send
You don't need a transaction manager but you do need a transactional.id on the producer factory.
If the send is performed within the scope of the JPA transaction (e.g. @Transactional
method with the JPA TM), the kafka template will synchronize the Kafka transaction with the existing transaction and either commit it, or roll it back depending on the main transaction.
Are you aware that, even rolled back records, are actually written to the log? You must set the consumer property isolation.level
to read_committed
to not receive rolled-back records; it defaults to read_uncommitted
.
EDIT
There is a bug synchronizing producer-only transactions to an existing transaction; the send is performed in a local transaction instead.
You can use a TransactionTemplate
to start a Kafka transaction as a work-around:
@SpringBootApplication
public class So68460690Application {
public static void main(String[] args) {
SpringApplication.run(So68460690Application.class, args);
}
@Bean
public ApplicationRunner runner(StreamBridge bridge, Foo foo, KafkaTransactionManager<byte[], byte[]> ktm) {
return args -> {
new TransactionTemplate(ktm).executeWithoutResult(
status -> foo.doInTx(bridge)); // or execute() to return a result
};
}
@Bean
KafkaTransactionManager<byte[], byte[]> binderTM(BinderFactory bf) {
return new KafkaTransactionManager<>(((KafkaMessageChannelBinder) bf.getBinder("kafka", MessageChannel.class))
.getTransactionalProducerFactory());
}
}
@Component
class Foo {
@Transactional
public void doInTx(StreamBridge bridge) {
bridge.send("ouutput", "test");
throw new RuntimeException("testEx");
}
}
spring.cloud.stream.bindings.output.destination=so68460690
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.
spring.cloud.stream.kafka.binder.configuration.acks=all
logging.level.org.springframework.kafka=trace
2021-07-27 17:31:37.923 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] beginTransaction()
2021-07-27 17:31:37.924 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0]]
2021-07-27 17:31:37.927 DEBUG 55933 --- [ main] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2021-07-27 17:31:37.928 DEBUG 55933 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@56c8e6f0] abortTransaction()