Search code examples
springkotlinapache-kafkaspring-kafkakafka-producer-api

Error when creating a topic with partition choosed on Spring Kafka


I'm learning to use Kafka using Spring Kafka in Kotlin. I understanded that when a new topic is published, it is created if not exists. So, when I send a value to a new/old topic created from Spring the default partition is 0, but I want write a message on another parition, like partition 1 for example.

When I create/write on a topic, it's works:

val topicTesteKotlin = "topico-teste-kotlin"

fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
        val msg = Optional.of(message)
        return if (msg.isPresent) {
            kafkaTemplate.send(topicTesteKotlin, message).addCallback({
                println("Sent message=[" + message +
                        "] with offset=[" + it!!.recordMetadata.offset() + "]")
            }, {
                println("Unable to send message=["
                        + message + "] due to : " + it.message)
            })
            ResponseEntity.ok(msg.get())
        } else {
            kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
            ResponseEntity.badRequest().body("Bad request!")
        }
    }

But, when I choose the partition and key using:

val topicTesteKotlin = "topico-teste-kotlin"

fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
        val msg = Optional.of(message)
        return if (msg.isPresent) {
            kafkaTemplate.send(topicTesteKotlin, 1, "1", message).addCallback({
                println("Sent message=[" + message +
                        "] with offset=[" + it!!.recordMetadata.offset() + "]")
            }, {
                println("Unable to send message=["
                        + message + "] due to : " + it.message)
            })
            ResponseEntity.ok(msg.get())
        } else {
            kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
            ResponseEntity.badRequest().body("Bad request!")
        }
    }

I got the follow error:

org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).

I tried change key to 0.1 but also not works. Apparentely when I create a topic from Spring client just a partition is created and that is 0.

Kafka Producer config

@Configuration
class KafkaProducerConfig {

    @Bean
    fun producerFactory() : ProducerFactory<String, String> {
        val configProps = HashMap<String,Any>()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate() : KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

}

So, how I create partition from Spring Kafka client?


Solution

  • You could manage topic creation mechanism by using the following code:

    @Configuration
    public class KafkaTopicConfig {
        @Value(value = "${kafka.bootstrapAddress}")
        private String bootstrapAddress;
        private String testTopicName = "topico-teste-kotlin";
    
        @Bean
        public KafkaAdmin kafkaAdmin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,       bootstrapAddress);
        return new KafkaAdmin(configs);
        }
       @Bean
        public NewTopic testTopic() {
            // second parameter is a number of partitions
            return new NewTopic(testTopicName, 2, (short) 1);
        }
    
    }