Search code examples
javaapache-kafkakafka-consumer-apikafka-topic

Why doesn't a KafkaConsumer throw an error when a non-existing topic is assigned?


I have written a KafkaConsumer. The configuration looks like this:

  @Bean
  Map<String, Object> consumerConfig(
      @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
    return Map.of(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapServers,
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
        "earliest",
        ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
        false,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
  }

When I assign a topic that does not exist to the KafkaConsumer, no error is thrown. This is the code:

var topicPartition = new TopicPartition("75757584959595943", key);
var partitions = Set.of(topicPartition);
consumer.assign(partitions);
for (var records = consumer.poll(Duration.ZERO); !records.isEmpty(); ) {
  // ...

Why does the KafkaConsumer not alert me about a non-existing topic? Wouldn't this be helpful?


Solution

  • It does alert; in the logs you'll see info messages including UNKNOWN_TOPIC_OR_PARTITION

    It's not a fatal exception. The consumer will continue to fetch cluster metadata and wait until the topic exists, and then poll it when it does.

    If you'd like to have spring create the topic, make a NewTopic bean for it.

    If you'd like to check for topic existence, and throw your own exception, use an AdminClient and the describeTopics method