I was asked this question out of band and thought I would post it here to help the community if similar requirements exist.
Paraphrasing:
I would like to poll an HTTP endpoint for a request/reply scenario, where the controller makes a request over Apache Kafka to a service that returns a reply, but I don't want to block in the controller waiting for the reply; I wish to poll the endpoint periodically to get the reply.
Use the ReplyingKafkaTemplate
and the CompletableFuture
it returns to asynchronously receive the reply and store it for use on the next HTTP poll:
(Note that 3.0.x and above use CompletableFuture
, 2.9.x returned a ListenableFuture
, but similar semantics apply there).
For simplicity, this example includes the server code (@KafkaListener
) too.
Note that the client must provide a unique token of some sort to correlate the request/reply.
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
KafkaTemplate<String, String> template) {
factory.setReplyTemplate(template);
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("replies");
container.getContainerProperties().setGroupId("replies");
ReplyingKafkaTemplate<String, String, String> replier = new ReplyingKafkaTemplate<>(pf, container);
replier.setDefaultTopic("requests");
return replier;
}
@Bean
KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("requests").partitions(1).replicas(1).build();
}
@Bean
NewTopic reply() {
return TopicBuilder.name("replies").partitions(1).replicas(1).build();
}
@KafkaListener(id = "upperCaser", topics = "requests")
@SendTo
String upperCaseIt(String in) {
System.out.println("Request: " + in);
return in.toUpperCase();
}
}
@RestController
class Controller {
private final ReplyingKafkaTemplate<String, String, String> template;
private final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
private final Set<String> awaiting = ConcurrentHashMap.newKeySet();
public Controller(ReplyingKafkaTemplate<String, String, String> template) {
this.template = template;
}
@GetMapping(path = "/get/foo/{correlation}/{data}")
public String get(@PathVariable String correlation, @PathVariable String data) {
if (this.awaiting.add(correlation)) {
RequestReplyMessageFuture<String, String> future =
this.template.sendAndReceive(MessageBuilder.withPayload(data)
.build());
future.whenComplete((msg, ex) -> {
if (ex == null) {
this.map.put(correlation, (String) msg.getPayload());
}
else {
this.map.put(correlation, "Exception: " + ex.getMessage());
}
});
}
String reply = this.map.remove(correlation);
if (reply != null) {
this.awaiting.remove(correlation);
return reply + "\n";
}
else {
return "no result yet\n";
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
Result:
% curl http://localhost:8080/get/foo/1/test
no result yet
% curl http://localhost:8080/get/foo/1/test
TEST