I recently wanted to add a new behavior in my project that uses spring-kafka.
The idea is really simple :
The goal is to create dynamically new topics and consumer. I cannot use spring kafka annotation since I need it to be dynamic so I did this :
@KafkaListener(topics = ScenarioTopics.NEW_SCENARIO)
public void receive(final String topic) {
logger.info("Get new scenario " + topic + ", creating new consumer");
TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(
"APP2_" + topic, 1, 0L);
ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
containerProps.setMessageListener((MessageListener<Object, String>) message -> {
// process my message
});
KafkaMessageListenerContainer<Object, String> container = new KafkaMessageListenerContainer<>(kafkaPeopleConsumerFactory, containerProps);
container.start();
}
And this does not work. I'm missing probably something, but I can't figure what.
Here I have some logs that tells me that the leader is not available, which is weird since I got the new scenario event.
2022-03-14 18:08:26.057 INFO 21892 --- [ntainer#0-0-C-1] o.l.b.v.c.c.i.k.KafkaScenarioListener : Get new scenario W4BdDBEowY, creating new consumer
2022-03-14 18:08:26.061 INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
[...lot of things...]
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1647277706067
2022-03-14 18:08:26.068 INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Subscribed to partition(s): PEOPLE_W4BdDBEowY-1
2022-03-14 18:08:26.072 INFO 21892 --- [ -C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Seeking to offset 0 for partition PEOPLE_W4BdDBEowY-1
2022-03-14 18:08:26.081 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 2 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
2022-03-14 18:08:26.081 INFO 21892 --- [ -C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Cluster ID: ebyKy-RVSRmUDaaeQqMaQg
2022-03-14 18:18:04.882 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5314 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
2022-03-14 18:18:04.997 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5315 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
How do I create dynamically a kafka consumer on a topic ? I think I do it very wrong, but I searched a lot and really didn't find anything.
There are several answers here about dynamically creating containers...
Trigger one Kafka consumer by using values of another consumer In Spring Kafka
Kafka Consumer in spring can I re-assign partitions programmatically? Create consumer dynamically spring kafka
Dynamically start and off KafkaListener just to load previous messages at the start of a session