I am working on an application that already consumes and creates Kafka messages. Creating Listeners that are initialized at startup using @Component and consume whatever messages are send to their respective topic(s) is not an issue.
But for the import of a big number of entries into a database, I have to wait for a REST-call before I start to consume certain Kafka topics and filter for a specific search pattern provided by the REST-call. How can this be achived?
My idea was to do something like this:
public ResponseEntity<Result> restCall(String searchCriteria){
...
MyKafkaListener mkl = new MyKafkaListener(searchCriteria);
// Do whatever I need to do to get it running...
mkl.start();
...
}
I only found a very complex way using KafkaListenerEndpointRegistry and a bunch of factory classes that need implementing. Do I really have to implement all those interfaces to do what I plan or is there a simpler way to start Kafka consumers as needed?
EDIT:
One idea was to have a class that I could configure while it's already instanziated. My Listener could be like
@KafkaListener(id = "myListener", topics = { "wholeLottaTopics"}, autoStartup = "false", groupId = "${spring.kafka.groupid}")
public boolean onMessage(ConsumerRecord cr)
{...}
This Listener would be initialized during startup and I could get access to it and start it:
MessageListenerContainer myListener = kafkaListenerEndpointRegistry.getListenerContainer("myListener");
myListener.start();
But retrieving it this way, I cannot access it's fields like searchCriteria etc. Also, when I get multiple REST-requests that tell my service to import data from the same topics according to different criteria, I cannot run another Consumer at the same time.
I came up with the following solution, it's not exaclty what I need and will need some work, but this basically does what I need:
ConsumerConfig.java
@Configuration
public class ConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
String bootstrapAddress;
// Import all the other settings from application.yaml using @Value-notation
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// Create props for Factory
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
// ...
return new DefaultKafkaConsumerFactory<>(props);
}
}
Then, inside of my REST-Controller:
RestController.java
public class RestController{
@Autowired
ConsumerFactory<String, String> consumerFactory;
// ...
private void consumeKafka(String searchCriteria) {
Consumer<String, String> consumer = consumerFactory.createConsumer();
consumer.subscribe(Arrays.asList("a.bunch.of.topics"));
while(true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord cr : consumerRecords) {
// Convert cr to object, check whether object fits criteria, import or not...
}
}
}
}
This basically does the trick and can be invoked in parallel, but leaves my microservice with a bunch of open and never gracefully deserializing KafkaConsumers.
Thanks to @Mar-Z and @OneCricketeer for your input!