Search code examples
spring-bootspring-kafkapartitioningembedded-kafka

Integration test using EmbeddedKafka: ContainerTestUtil.waitForAssignment throws Expected 1 but got 0 partitions


I have written an integration test for my kafka consumer using spring boot, with the spring-kafka libraries. This test uses EmbeddedKafka. A topic with one partition is used. I used the KafkaMessageListener container for this. But I am getting an error in this line

ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic())

The error that I am getting is:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

The code which I referred to is: https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/

@EmbeddedKafka (partitions 1, ports = 9092) 
@SpringBoot Test (properties="spring.kafka.bootstrap-servers=${spring.embedded.kakfa.brokers}")

@RunWith(SpringRunner.class)

@DirtiesContext

@Profile("test") 
@TestPropertySource({"classpath:application.yaml"}}

public class Onboarding ConsumerListenerTest {

BlockingQueue<ConsumerRecord<String, String> records;

KafkaMessageListenerContainer<String, String> container;

@Autowired

protected EmbeddedkafkaBroker embeddedKafkaBroker;

public ConsumerFactory<String, String> consumerFactory;

@Value("${spring.kafka.client_topic}")

private String topicName;

@Value("${spring.kafka.group_id})
private String groupId;

@Before
public void setUp(){
    
    consumerFactory = getKafkaConsumer(embeddedKafkaBroker, groupId, topicName);
    ContainerProperties containerProperties = new ContainerProperties(topicName);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    consumerRecords = new LinkedBlockingQueue<>();
    container.setupMessageListener(new MessageListener<String, String>(){
        @Override
        public void onMessage(ConsumerRecord<String, String> record){
            records.add(record);
        }
    });
    
    container.start();
    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
}

And the getKafkaConsumer() function is:

public ConsumerFactory<String, String> 
                             getKafkaConsumer(EmbeddedKafkaBroker 
                               embeddedKafkaBroker,
                               String group,
                               String topic){
        Map<string, Object> consumerProps = 
        KafkaTestUtils.consumerProps(group, "false", 
                                     embeddedkafkaNroker);
       consumerProps.put(ConsumerConfig.BOOSTRAP_SERVER_CONFIG, 
                           embeddedKafkaBroker.getBrokerAsString());
       consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
       consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_COFIG, KafkaAvroDeserializer.class);
       consumerProps.put("schema.registry.url", "bogus");
       consumerProps.put("specific.avro.reader", true);
       ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps);
       return consumerFactory;
}

Solution

  • It is hard to say what is happening without more information.

    An alternative is to manually assign the partitions instead of waiting for group management to assign them:

    ContainerProperties containerProperties = 
        new ContainerProperties(new TopicPartitionOffset(topicName, 0), 
                                new TopicPartitionOffset(topicName, 1));