I have many transactional consumers with a ChainedKafkaTransactionManager
based on a JpaTransactionManager
and a KafkaTransactionManager
(all @KafkaListener
's).
The JPA one needs a ThreadLocal variable to be set, to be able to know to which DB to connect to (is the tenant id).
When starting the application, in the onPartitionsAssigned
listener, spring-kafka is trying to create a chained txn, hence trying to create a JPA txn, but there's no tenant set, then it fails.
That tenant is set through a http filter and/or kafka interceptors (through event headers).
I tried using the auto-wired KafkaListenerEndpointRegistry
with setAutoStartup(false)
, but I see that the consumers don't receive any events, probably because they aren't initialized yet (I thought they were initialized on-demand).
If I set a mock tenant id and call registry.start()
when the application is ready, the initializations seem to be done in other threads (probably because I'm using a ConcurrentKafkaListenerContainerFactory
), so it doesn't work.
Is there a way to avoid the JPA transaction on that initial onPartitionsAssigned
listener, that is part of the consumer initialization?
If your chained TM has the KafkaTM first, followed by JPA TM (which would be the normal case), you can achieve similar functionality by just injecting the Kafka TM into the container and using @Transactional
(with just the JPA TM on the listener) to start the JPA transaction when the listener is called.
The time between the transaction commits will be marginally increased but it would provide similar functionality.
If that won't work for you, open a GitHub issue; we can either disable the initial commit on assignment, or do it without a transaction at all (optionally).