Search code examples
javaspring-bootspring-cloudspring-kafkakafka-producer-api

how to create N number of Kafka Template dynamicaly at run time - spring boot


I have a spring boot application that needs to connect N number of Kafka clusters. based on some condition Kafka template need to switch and send a message

I have seen some solutions to create separate Kafka template beans but in my use case number of clusters will change at the deployment time

ex:

@Bean(name = "cluster1")
public KafkaTemplate<String, String> kafkaTemplatesample1() {
    return new KafkaTemplate<>(devProducerFactory1());
}

@Bean(name = "cluster2")
public KafkaTemplate<String, String> kafkaTemplatesample2() {
    return new KafkaTemplate<>(devProducerFactory2());
}

is there any other solution for this? if you can share a sample code its much appreciated


Solution

  • Let's assume that each cluster can be described with the following attributes:

    @Getter
    @Setter
    public class KafkaCluster {
      private String beanName;
      private List<String> bootstrapServers;
    }
    

    For example, two clusters are defined in the application.properties:

    kafka.clusters[0].bean-name=cluster1
    kafka.clusters[0].bootstrap-servers=CLUSTER_1_URL
    kafka.clusters[1].bean-name=cluster2
    kafka.clusters[1].bootstrap-servers=CLUSTER_2_URL
    

    Those properties are needed before beans are instantiated, to register KafkaTemplate beans' definitions, which makes @ConfigurationProperties unsuitable for this case. Instead, Binder API is used to bind them programmatically.

    KafkaTemplate beans' definitions can be registered in the implementation of BeanDefinitionRegistryPostProcessor interface.

    public class KafkaTemplateDefinitionRegistrar implements BeanDefinitionRegistryPostProcessor {
    
      private final List<KafkaCluster> clusters;
    
      public KafkaTemplateDefinitionRegistrar(Environment environment) {
        clusters= Binder.get(environment)
            .bind("kafka.clusters", Bindable.listOf(KafkaCluster.class))
            .orElseThrow(IllegalStateException::new);
      }
    
      @Override
      public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        clusters.forEach(cluster -> {
          GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
          beanDefinition.setBeanClass(KafkaTemplate.class);
          beanDefinition.setInstanceSupplier(() -> kafkaTemplate(cluster));
          registry.registerBeanDefinition(cluster.getBeanName(), beanDefinition);
        });
      }
    
      @Override
      public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
      }
    
      public ProducerFactory<String, String> producerFactory(KafkaCluster kafkaCluster) {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaCluster.getBootstrapServers());
        configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
      }
    
      public KafkaTemplate<String, String> kafkaTemplate(KafkaCluster kafkaCluster) {
        return new KafkaTemplate<>(producerFactory(kafkaCluster));
      }
    }
    

    Configuration class for the KafkaTemplateDefinitionRegistrar bean:

    @Configuration
    public class KafkaTemplateDefinitionRegistrarConfiguration {
      @Bean
      public static KafkaTemplateDefinitionRegistrar beanDefinitionRegistrar(Environment environment) {
        return new KafkaTemplateDefinitionRegistrar(environment);
      }
    }
    

    Additionally, exclude KafkaAutoConfiguration in the main class to prevent creating the default KafkaTemplate bean. This is probably not the best way because all the other KafkaAutoConfiguration beans are not created in that case.

    @SpringBootApplication(exclude={KafkaAutoConfiguration.class})
    

    Finally, below is a simple test that proves the existence of two KafkaTemplate beans.

    @SpringBootTest
    class SpringBootApplicationTest {
        @Autowired
        List<KafkaTemplate<String,String>> kafkaTemplates;
        
        @Test
        void kafkaTemplatesSizeTest() {
            Assertions.assertEquals(kafkaTemplates.size(), 2);
        }
    }
    

    For reference: Create N number of beans with BeanDefinitionRegistryPostProcessor, Spring Boot Dynamic Bean Creation From Properties File