I have a spring boot application that needs to connect N number of Kafka clusters. based on some condition Kafka template need to switch and send a message
I have seen some solutions to create separate Kafka template beans but in my use case number of clusters will change at the deployment time
ex:
@Bean(name = "cluster1")
public KafkaTemplate<String, String> kafkaTemplatesample1() {
return new KafkaTemplate<>(devProducerFactory1());
}
@Bean(name = "cluster2")
public KafkaTemplate<String, String> kafkaTemplatesample2() {
return new KafkaTemplate<>(devProducerFactory2());
}
is there any other solution for this? if you can share a sample code its much appreciated
Let's assume that each cluster can be described with the following attributes:
@Getter
@Setter
public class KafkaCluster {
private String beanName;
private List<String> bootstrapServers;
}
For example, two clusters are defined in the application.properties
:
kafka.clusters[0].bean-name=cluster1
kafka.clusters[0].bootstrap-servers=CLUSTER_1_URL
kafka.clusters[1].bean-name=cluster2
kafka.clusters[1].bootstrap-servers=CLUSTER_2_URL
Those properties are needed before beans are instantiated, to register KafkaTemplate
beans' definitions, which makes @ConfigurationProperties
unsuitable for this case. Instead, Binder API is used to bind them programmatically.
KafkaTemplate
beans' definitions can be registered in the implementation of BeanDefinitionRegistryPostProcessor
interface.
public class KafkaTemplateDefinitionRegistrar implements BeanDefinitionRegistryPostProcessor {
private final List<KafkaCluster> clusters;
public KafkaTemplateDefinitionRegistrar(Environment environment) {
clusters= Binder.get(environment)
.bind("kafka.clusters", Bindable.listOf(KafkaCluster.class))
.orElseThrow(IllegalStateException::new);
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
clusters.forEach(cluster -> {
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(KafkaTemplate.class);
beanDefinition.setInstanceSupplier(() -> kafkaTemplate(cluster));
registry.registerBeanDefinition(cluster.getBeanName(), beanDefinition);
});
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
public ProducerFactory<String, String> producerFactory(KafkaCluster kafkaCluster) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaCluster.getBootstrapServers());
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
public KafkaTemplate<String, String> kafkaTemplate(KafkaCluster kafkaCluster) {
return new KafkaTemplate<>(producerFactory(kafkaCluster));
}
}
Configuration class for the KafkaTemplateDefinitionRegistrar
bean:
@Configuration
public class KafkaTemplateDefinitionRegistrarConfiguration {
@Bean
public static KafkaTemplateDefinitionRegistrar beanDefinitionRegistrar(Environment environment) {
return new KafkaTemplateDefinitionRegistrar(environment);
}
}
Additionally, exclude KafkaAutoConfiguration
in the main class to prevent creating the default KafkaTemplate
bean. This is probably not the best way because all the other KafkaAutoConfiguration
beans are not created in that case.
@SpringBootApplication(exclude={KafkaAutoConfiguration.class})
Finally, below is a simple test that proves the existence of two KafkaTemplate
beans.
@SpringBootTest
class SpringBootApplicationTest {
@Autowired
List<KafkaTemplate<String,String>> kafkaTemplates;
@Test
void kafkaTemplatesSizeTest() {
Assertions.assertEquals(kafkaTemplates.size(), 2);
}
}
For reference: Create N number of beans with BeanDefinitionRegistryPostProcessor, Spring Boot Dynamic Bean Creation From Properties File