Search code examples
javaspringspring-bootspring-kafka

Sending message to Kafka on dynamically created topic gives error LEADER_NOT_AVAILABLE


I am trying to send the message on dynamically created topic using below code:

@Service
public class KafkaMessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        String topicName =  topic + Integer.valueOf(RandomStringUtils.randomNumeric(1, 4))%100;
        kafkaTemplate.send(topicName, payload);
        LOGGER.info("sent payload to topic='{}'", topicName);
    }
}

When I call this send method with topic = 'somethingnew', I get below error, (its interesting that its able to send some message, but for many it gives the error, and after some time, it starts sending the message on those topic too) but when I restart the server, and call the send method again with topic = 'somethingnew', it works fine! Is there some potential bug in kafka? I am using spring boot version: 1.5.4.RELEASE.

2018-02-13 09:52:35.927  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2018-02-13 09:52:35.927  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2018-02-13 09:52:36.067  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 0 : {betsyncDataTopicNew123.5=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.272  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.5'
2018-02-13 09:52:36.372  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {betsyncDataTopicNew123.93=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.475  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.93'
2018-02-13 09:52:36.583  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 6 : {betsyncDataTopicNew123.4=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.686  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.4'
2018-02-13 09:52:36.794  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 9 : {betsyncDataTopicNew123.7=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:36.897  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.7'
2018-02-13 09:52:37.008  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 12 : {betsyncDataTopicNew123.0=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.110  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.0'
2018-02-13 09:52:37.220  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 15 : {betsyncDataTopicNew123.1=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.320  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.1'
2018-02-13 09:52:37.431  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 18 : {betsyncDataTopicNew123.10=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.534  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.10'
2018-02-13 09:52:37.534  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.4'
2018-02-13 09:52:37.642  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 22 : {betsyncDataTopicNew123.71=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.747  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.71'
2018-02-13 09:52:37.859  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 25 : {betsyncDataTopicNew123.2=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:37.962  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.2'
2018-02-13 09:52:37.963  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.7'
2018-02-13 09:52:38.074  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 29 : {betsyncDataTopicNew123.33=LEADER_NOT_AVAILABLE}
2018-02-13 09:52:38.176  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.33'
2018-02-13 09:52:38.176  INFO [betsync-adapter-service,,,] 22241 --- [      Thread-28] c.b.b.a.k.message.KafkaMessageSender     : sent payload to topic='betsyncDataTopicNew123.1'
2018-02-13 09:52:38.285  WARN [betsync-adapter-service,,,] 22241 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 33 : {betsyncDataTopicNew123.17=LEADER_NOT_AVAILABLE}

And this is just the warning, and we are not getting any exception, so have no idea if the message gets lost or not.


Solution

  • Use the AdminClient to create topics on-demand.

    Spring for Apache Kafka provides a convenient KafkaAdmin which can create topics for NewTopic beans in the application context, but it can also be used to create an AdminClient so you can manually create topics - docs here. Spring Boot 2.0 auto configuration registers one for you.

    Once you have an AdminClient instance, use one of the createTopics() methods. See the AdminClient javadocs for more information. Be sure to wait for the CreateTopicsResult's Future<?>s to complete.