Search code examples
apache-kafkaapache-kafka-streamsspring-kafkaembedded-kafka

Trying to test spring + kafka embedded multiple consumers in group


I'm trying to test parallel consumption from 3-paritition topic in kafka. I assumed that all consumers should be able to subscribe and process messages, but I get the exception:

java.lang.IllegalStateException: Failed to be assigned partitions from the embedded topics

and before exception this is what I see (a lot lines with this):

[ | common-group] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-common-group-1, groupId=common-group] Request joining group due to: group is already rebalancing
[ | common-group] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-common-group-1, groupId=common-group] Request joining group due to: group is already rebalancing

Here is the code:

@SpringBootTest(classes = StreamApp.class)
@EmbeddedKafka(partitions = 3,
        topics = {
                "${kafka-demo.topics.input.name}",
                "${kafka-demo.topics.output.name}"
        },
        brokerProperties = {
                "transaction.state.log.replication.factor=1",
                "offsets.topic.replication.factor=1",
                "transaction.state.log.min.isr=1"
        })
public class ScalingIT {
    @Autowired
    private KafkaTemplate<Object, Object> template;
    @Autowired
    private EmbeddedKafkaBroker broker;
    @Autowired
    private JsonSerde<Message> messageSerde;
    private List<Consumer<String, Message>> messageConsumers = new ArrayList<>();
    @Autowired
    private KafkaDemoProps props;

    @BeforeEach
    void setup() {
        this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
        this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
        this.messageConsumers.add(consumer(props.topics().output().name(), Serdes.String(), messageSerde));
    }

    @AfterEach
    public void teardown() {
        messageConsumers.forEach(c -> {
            if (c != null) {
                c.close();
            }
        });
    }

    private <K, V> Consumer<K, V> consumer(String topic, Serde<K> keySerde, Serde<V> valueSerde) {
        Map<String, Object> consumerProps =
                KafkaTestUtils.consumerProps("common-group", "false", this.broker);
        consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);

        DefaultKafkaConsumerFactory<K, V> kafkaConsumerFactory =
                new DefaultKafkaConsumerFactory<>(consumerProps, keySerde.deserializer(), valueSerde.deserializer());
        Consumer<K, V> consumer = kafkaConsumerFactory.createConsumer();
        this.broker.consumeFromAnEmbeddedTopic(consumer, topic);
        return consumer;
    }

    @Test
    @DirtiesContext
    void allSentMessagesGoVia3Consumers() {
        int numRecords = 100;
        IntStream.range(0,numRecords).forEach(i -> {
            template.send(props.topics().input().name(), String.format("{\"SomeProp\":\"%s\"}", i).getBytes());
        });
        ConsumerRecords<String, Message> records1 = KafkaTestUtils.getRecords(this.messageConsumers.get(0));
        ConsumerRecords<String, Message> records2 = KafkaTestUtils.getRecords(this.messageConsumers.get(1));
        ConsumerRecords<String, Message> records3 = KafkaTestUtils.getRecords(this.messageConsumers.get(2));
        List<ConsumerRecord<String, Message>> allRecords = new ArrayList<>();
        records1.forEach(allRecords::add);
        records2.forEach(allRecords::add);
        records3.forEach(allRecords::add);
        assertThat(allRecords.size()).isEqualTo(numRecords);
    }
}

Anyone has any idea how to test multiple consumers in the same group?


Solution

  • It wasn't designed to be used that way; the problem is, when the second consumer polls, the first consumer needs to call poll() for the rebalance to complete.

    It was only intended to support a single consumer.

    You should be able to use manual assignment instead of group management.

    Replace

    this.broker.consumeFromAnEmbeddedTopic(consumer, topic);
    

    with

    consumer.assign(List.of(new TopicPartition(topic, this.partition++)))
    

    You will also need

    bootstrapServersProperty = "spring.kafka.bootstrap-servers"
    

    on the embedded broker so that the auto configured template will send the records there.

    EDIT

    I also changed your test to send a deterministic number of records to each partition and specifically to wait for that number to arrive...

    @Test
    @DirtiesContext
    void allSentMessagesGoVia3Consumers() {
        int numRecords = 100;
        IntStream.range(0, numRecords).forEach(i -> {
            template.send("output", i % 3, null, String.format("{\"SomeProp\":\"%s\"}", i).getBytes());
        });
        ConsumerRecords<String, String> records1 = KafkaTestUtils.getRecords(this.messageConsumers.get(0), 60_000, 34);
        ConsumerRecords<String, String> records2 = KafkaTestUtils.getRecords(this.messageConsumers.get(1), 60_000, 33);
        ConsumerRecords<String, String> records3 = KafkaTestUtils.getRecords(this.messageConsumers.get(2), 60_000, 33);
        List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();
        records1.forEach(allRecords::add);
        records2.forEach(allRecords::add);
        records3.forEach(allRecords::add);
        assertThat(allRecords.size()).isEqualTo(numRecords);
    }