Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafkaspring-web

rest controller to return records in kafka via spring kafka


For my demo application, I have to create a rest controller to return the message in the kafka queue. I have read the spring-kafka reference guide and implemented the consumer configuration and created beans as below

@Configuration
@EnableKafka
public class ConsumerConfiguration {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx");

        return props;
    }

    @Bean
    public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Transaction.class)
        );
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Transaction>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(transactionConsumerFactory());

        return factory;
    }

    @Bean
    public Consumer consumer() {
        return new Consumer();
    }

}

and another class consumer like below

public class Consumer {

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

    @KafkaListener(topics = "${kafka.topic.name}")
    public void receive(Transaction transaction) {
        latch.countDown();
    }
}

How can I now implement the logic to retrieve transaction from the consumer on each get hit on the controller.

Thanks in advance.


Solution

  • Well, the @KafkaListener produces independent long-lived process to stream records from Kafka to the callback. Since you are talking about the REST GET event, you don't have choice unless obtain the KafkaConsumer from the ConsumerFactory and call its poll() manually from the controller method.