Search code examples
spring-bootspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream Kafka Binder KafkaTransactionManager results in a cycle in application context


I am setting up a basic Spring Cloud Stream producer with Kafka. The intent is to accept a HTTP POST, save the result of the post to a database with Spring Data JPA, and write the results to a Kafka topic using Spring Cloud Stream Kafka Binder. I am following the latest binder documentation on how to setup a KafkaTransactionManager, but this code results in an error on Application startup.

***************************
APPLICATION FAILED TO START
***************************

Description:

The dependencies of some of the beans in the application context form a cycle:

┌─────┐
|  kafkaTransactionManager defined in com.example.tx.Application
↑     ↓
|  org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration
└─────┘

I have the following Bean defined in my Application class, which is the same as documentation.

@Bean
public KafkaTransactionManager kafkaTransactionManager(BinderFactory binders) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionIdPrefix("tx-test");
    return tm;
}

It seems that calling getBinder causes Spring to create the context again. How can I resolve this circular dependency?

Dependencies: Spring Boot parent 2.4.6; Spring Cloud BOM 2020.0.3


Solution

  • Something must have changed in one of the layers; here is a work around:

    @Bean
    SmartInitializingSingleton ktmProvider(BinderFactory binders, GenericApplicationContext context) {
        return () -> {
            context.registerBean("kafkaTransactionManager", KafkaTransactionManager.class,
                    ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
                            .getTransactionalProducerFactory());
            context.getBean(KafkaTransactionManager.class).setTransactionIdPrefix("tx-test");
        };
    }
    

    i.e. wait for the other beans to be created before registering and configuring the tm.