Search code examples
javaspring-bootapache-kafkaspring-kafka

How can I create a KafkaConsumer during runtime? (In Spring Boot)


I am working on an application that already consumes and creates Kafka messages. Creating Listeners that are initialized at startup using @Component and consume whatever messages are send to their respective topic(s) is not an issue.

But for the import of a big number of entries into a database, I have to wait for a REST-call before I start to consume certain Kafka topics and filter for a specific search pattern provided by the REST-call. How can this be achived?

My idea was to do something like this:

public ResponseEntity<Result> restCall(String searchCriteria){
  ...
  MyKafkaListener mkl = new MyKafkaListener(searchCriteria);
  // Do whatever I need to do to get it running...
  mkl.start();
  ...
}

I only found a very complex way using KafkaListenerEndpointRegistry and a bunch of factory classes that need implementing. Do I really have to implement all those interfaces to do what I plan or is there a simpler way to start Kafka consumers as needed?

EDIT:

One idea was to have a class that I could configure while it's already instanziated. My Listener could be like

@KafkaListener(id = "myListener", topics = { "wholeLottaTopics"}, autoStartup = "false", groupId = "${spring.kafka.groupid}")
    public boolean onMessage(ConsumerRecord cr)
    {...}

This Listener would be initialized during startup and I could get access to it and start it:

MessageListenerContainer myListener = kafkaListenerEndpointRegistry.getListenerContainer("myListener");
myListener.start();

But retrieving it this way, I cannot access it's fields like searchCriteria etc. Also, when I get multiple REST-requests that tell my service to import data from the same topics according to different criteria, I cannot run another Consumer at the same time.


Solution

  • I came up with the following solution, it's not exaclty what I need and will need some work, but this basically does what I need:

    ConsumerConfig.java

    @Configuration
    public class ConsumerConfig {
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        String bootstrapAddress;
        // Import all the other settings from application.yaml using @Value-notation
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            // Create props for Factory
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            // ...
    
            return new DefaultKafkaConsumerFactory<>(props);
        }
    }
    

    Then, inside of my REST-Controller:

    RestController.java

    public class RestController{
        @Autowired
        ConsumerFactory<String, String> consumerFactory;
        // ...
        private void consumeKafka(String searchCriteria) {
            Consumer<String, String> consumer = consumerFactory.createConsumer();
            consumer.subscribe(Arrays.asList("a.bunch.of.topics"));
            while(true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord cr : consumerRecords) {
                    // Convert cr to object, check whether object fits criteria, import or not...
                }
            }
        }
    }
    

    This basically does the trick and can be invoked in parallel, but leaves my microservice with a bunch of open and never gracefully deserializing KafkaConsumers.

    Thanks to @Mar-Z and @OneCricketeer for your input!