Search code examples
springspring-bootapache-kafkaspring-kafka

Kafka topic not created automatically on remote kafka after spring boot start(and create on local kafka server)


1) I start kafka on my machine

2) I start my spring boot server with config:

@Bean
public NewTopic MyTopic() {
    return new NewTopic("my-topic", 5, (short) 1);
}

@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, byte[]> unpMessageKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

result - server is start successfully and create my-topic in kafka.

But if I try do it with remote kafka on remote server - topic not create.

and in log spring write:

12:35:09.880 [                  main] [INFO ]    o.a.k.clients.admin.AdminClientConfig: [] AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]

If I add this bean to config:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "remote_host:9092");
    return new KafkaAdmin(configs);
}

topic create succesfully.

1) Why it happens?

2) Do I have to create KafkaAdmin ? why for the local Kafka is not required?

EDDIT

My current config:

spring:
  kafka:
    bootstrap-servers: remote:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

and

@Configuration
public class KafkaTopicConfig {

    @Value("${response.topics.topicName}")
    private String topicName;

    @Bean
    public NewTopic responseTopic() {
        return new NewTopic(topicName, 5, (short) 1);
    }

}

After start I see:

bootstrap.servers = [remote:9092]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536

...

But topic not create


Solution

  • KafkaAdmin is the kafka spring object that looks for NewTopic objects in your spring context and creates them. If you do not have a KafkaAdmin no creation will take place. You can explicitly create KafkaAdmin (as you show in your code snippet) or indirectly order its creation via the spring kafka configuration properties.

    KafkaAdmin is a nice to have it is not related to production or consumption to/ from topics for your application code.

    EDIT

    You must have something wrong; I just tested it...

    spring:
      kafka:
        bootstrap-servers: remote:9092
    

    and

    2019-03-21 09:18:18.354  INFO 58301 --- [           main] o.a.k.clients.admin.AdminClientConfig    
            : AdminClientConfig values: 
        bootstrap.servers = [remote:9092]
        ...