Search code examples
javaspring-kafkaspring-kafka-test

KafkaTestUtils.getRecords() or KafkaTestUtils.getSingleRecord() takes too long some times


I am testing a Spring-boot application that produces kafka messages, I have created a consumer in the tests to validate that we are sending the message properly. When I use KafkaTestUtils.getRecords() or KafkaTestUtils.getSingleRecord() in the test the time it takes to receive the records varies a lot on each run. Sometimes takes 1 second and other 20 seconds, Is this expected? Is there any way to improve performance?

This is the test:

 void ProducerTest() {
        //given        
        Map<String, Object> props =  new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//Running docker kafka locally
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DTODeserializer.class);
        ConsumerFactory<String, DTO> cf =  new DefaultKafkaConsumerFactory<>(props);
        Consumer<String, DTO> consumerTest = cf.createConsumer();
        consumerTest.subscribe(Collections.singleton("topic"));
       
        //when
        //call to api-rest that produces the kafka messate in the "topic"

        //then
        ConsumerRecords<String, DTO> records = KafkaTestUtils.getRecords(consumerTest);
        //Assert
    }

Solution

  • It must be a problem with either your docker container or your producer; this runs consistently (less than half a second) for me with a local broker.

    @Test
    void ProducerTest() throws Exception {
        // given
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(props);
        Consumer<String, String> consumerTest = cf.createConsumer();
        consumerTest.subscribe(Collections.singleton("topic"));
    
        // when
        props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
        template.send("topic", "foo").get(10, TimeUnit.SECONDS);
    
        // then
        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumerTest);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record).isNotNull();
        assertThat(record.value()).isEqualTo("foo");
    
        pf.reset();
        consumerTest.close();
    }