Search code examples
spring-bootapache-kafkaspring-kafka

Spring Example HTTP Polling with Apache Kafka Request/Reply


I was asked this question out of band and thought I would post it here to help the community if similar requirements exist.

Paraphrasing:

I would like to poll an HTTP endpoint for a request/reply scenario, where the controller makes a request over Apache Kafka to a service that returns a reply, but I don't want to block in the controller waiting for the reply; I wish to poll the endpoint periodically to get the reply.


Solution

  • Use the ReplyingKafkaTemplate and the CompletableFuture it returns to asynchronously receive the reply and store it for use on the next HTTP poll:

    (Note that 3.0.x and above use CompletableFuture, 2.9.x returned a ListenableFuture, but similar semantics apply there).

    For simplicity, this example includes the server code (@KafkaListener) too.

    Note that the client must provide a unique token of some sort to correlate the request/reply.

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @Bean
        ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
                ConcurrentKafkaListenerContainerFactory<String, String> factory,
                KafkaTemplate<String, String> template) {
    
            factory.setReplyTemplate(template);
            ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("replies");
            container.getContainerProperties().setGroupId("replies");
            ReplyingKafkaTemplate<String, String, String> replier = new ReplyingKafkaTemplate<>(pf, container);
            replier.setDefaultTopic("requests");
            return replier;
        }
    
        @Bean
        KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
            return new KafkaTemplate<>(pf);
        }
    
        @Bean
        NewTopic topic1() {
            return TopicBuilder.name("requests").partitions(1).replicas(1).build();
        }
    
        @Bean
        NewTopic reply() {
            return TopicBuilder.name("replies").partitions(1).replicas(1).build();
        }
    
        @KafkaListener(id = "upperCaser", topics = "requests")
        @SendTo
        String upperCaseIt(String in) {
            System.out.println("Request: " + in);
            return in.toUpperCase();
        }
    
    }
    
    @RestController
    class Controller {
    
        private final ReplyingKafkaTemplate<String, String, String> template;
    
        private final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
    
        private final Set<String> awaiting = ConcurrentHashMap.newKeySet();
    
        public Controller(ReplyingKafkaTemplate<String, String, String> template) {
            this.template = template;
        }
    
        @GetMapping(path = "/get/foo/{correlation}/{data}")
        public String get(@PathVariable String correlation, @PathVariable String data) {
            if (this.awaiting.add(correlation)) {
                RequestReplyMessageFuture<String, String> future =
                        this.template.sendAndReceive(MessageBuilder.withPayload(data)
                                .build());
                future.whenComplete((msg, ex) -> {
                    if (ex == null) {
                        this.map.put(correlation, (String) msg.getPayload());
                    }
                    else {
                        this.map.put(correlation, "Exception: " + ex.getMessage());
                    }
                });
            }
            String reply = this.map.remove(correlation);
            if (reply != null) {
                this.awaiting.remove(correlation);
                return reply + "\n";
            }
            else {
                return "no result yet\n";
            }
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    

    Result:

    % curl http://localhost:8080/get/foo/1/test
    no result yet
    % curl http://localhost:8080/get/foo/1/test
    TEST