My spring boot service needs to consume kafka events off one topic, do some processing (including writing to the db with JPA) and then produce some events on a new topic. No matter what happens I cannot have a situation where I have published events without updating the database, and if anything goes wrong then I want the next poll of the consumer to retry the event. My processing logic including the db update is idempotent so retrying that is fine
I think I have achieved exactly once semantics as described on by using a ChainedKafkaTransactionManager like so:
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
return new ChainedKafkaTransactionManager(kafka, jpa);
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
ChainedKafkaTransactionManager chainedTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
The relevant kafka config in my application.yaml file looks like:
group-id: myGroupId
auto-offset-reset: earliest
isolation.level: read_committed
transaction-id-prefix: ${random.uuid}
Because the commit order is critical to my application I would like to write a integration test to prove that the commits happen in the desired order and that if an error occurs during the commit to kafka then the original event is consumed again. However I am struggling to find a good way of causing a failure between the db commit and the kafka commit.
Any suggestions or alternative ways I could do this?
You could use a custom ProducerFactory
to return a MockProducer
(provided by kafka-clients
Set the commitTransactionException
so that it is thrown when the KTM tries to commit the transaction.
Here is an example; it doesn't use the chained TM, but that shouldn't make a difference.
public class So66018178Application {
public static void main(String[] args) {, args);
@KafkaListener(id = "so66018178", topics = "so66018178")
public void listen(String in) {
@SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So66018178ApplicationTests {
EmbeddedKafkaBroker broker;
void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
throws InterruptedException {
AtomicReference<Exception> listenerException = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
.setAfterRollbackProcessor(new AfterRollbackProcessor<>() {
public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
Exception exception, boolean recoverable) {
Map<String, Object> props = KafkaTestUtils.producerProps(;
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.send("so66018178", "test");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
public static class Config {
RuntimeException exception = new RuntimeException("test");
public ProducerFactory<Object, Object> pf() {
return new ProducerFactory<>() {
public Producer<Object, Object> createProducer() {
MockProducer<Object, Object> mockProducer = new MockProducer<>();
mockProducer.commitTransactionException = Config.this.exception;
return mockProducer;
public Producer<Object, Object> createProducer(String txIdPrefix) {
Producer<Object, Object> producer = createProducer();
return producer;
public boolean transactionCapable() {
return true;