I use spring cloud kafka stream to do some processing to the stream save in the middle of the topology some data to the db and then continue the process in my topology and finally send final result to other topic
My Function bean looks something like this:
@Component("handler")
public Class Handler implements Function<KStrean<Key1,Value1>,KStream<Key2,Valu2>> {
KStream<Key2,Value2> apply(KStream<Key1,Value1> input){
//start process topology and make few transformation
input.mapValues(...).filter...(...)
//now save the the intermediate result to DB via Spring data JPA and Hibernate
someEntityJpaRepo.save()
//continue with the remaning topology
Where should I use the @Transactional annotation if I want all or nothing behavior like in RDBMS and what PlatformTransactionalManager should I use? Jpa? Kafka? For example I want to in the case of exception in the DB query or exception in the stream processing that the db transaction will roll back as well the kafa transaction
KafkaTransactionManager
is NOT for KStream
- kafka streams manage their own transactions.
Since Spring is not involved with Kafka Streams at runtime (only for topology setup), there is no concept of transaction synchronization between the two.
The JPA transaction will be independent of the Kafka Streams transaction and you must deal with the possibility of duplicate deliveries (make the DB update idempotent or use some other technique to detect that this Kafka record has already been stored to the DB.