I followed the Spring Kafka doc to create a request/reply pattern application.
The client using a reply template to send and receive the message.
@SpringBootApplication
@Slf4j
public class PingApplication {
public static final String TOPIC_PINGPONG = "pingpong";
public static void main(String[] args) {
SpringApplication.run(PingApplication.class, args);
}
@Bean
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
ProducerFactory<String, String> producerFactory,
GenericMessageListenerContainer<String, String> listenerContainer
) {
var template= new ReplyingKafkaTemplate<String, String, String>(producerFactory, listenerContainer);
// template.setSharedReplyTopic(false);
// template.setDefaultReplyTimeout(Duration.ofSeconds(5));
return template;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> listenerContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> listenerContainer =
containerFactory.createContainer("pingpong");
listenerContainer.getContainerProperties().setGroupId("pingpongGroup");
listenerContainer.setAutoStartup(false);
return listenerContainer;
}
@Bean
@SneakyThrows
RouterFunction<ServerResponse> router(ReplyingKafkaTemplate<String, String, String> template) {
return route()
.GET("/",
req -> {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_PINGPONG, "ping");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
replyFuture.addCallback(
result -> {
log.info("callback result: {}", result);
},
ex -> {
log.info("callback ex: {}", ex.getMessage());
}
);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
log.info("Sent ok: {}", sendResult.toString());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
log.info("Return value: {}->{}", consumerRecord.key(), consumerRecord.value());
return ok().body(consumerRecord.value());
}
)
.build();
}
}
The server-side application looks like:
@SpringBootApplication
public class PongApplication {
public static final String TOPIC_PINGPONG = "pingpong";
public static void main(String[] args) {
new SpringApplicationBuilder(PongApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
}
@Component
@Slf4j
@RequiredArgsConstructor
class PongHandler {
@KafkaListener(groupId = "server", containerGroup = "pingpongGroup", topics = PongApplication.TOPIC_PINGPONG)
@SendTo // use default replyTo expression
public String handle(String request) {
log.info("Received: {} in {}", request, this.getClass().getName());
return "pong at " + LocalDateTime.now();
}
}
When running the client and server-side application respectively and hit the http://localhost:8080 to send a message.
In the console of client application.
2020-09-02 13:57:30.116 INFO 17544 --- [nio-8080-exec-1] com.example.demo.ping.PingApplication : Return value: null->ping
2020-09-02 13:57:30.292 ERROR 17544 --- [Container-0-C-1] o.s.k.r.ReplyingKafkaTemplate : No pending reply: ConsumerRecord(topic = pingpong, partition = 5, leaderEpoch = 0, offset = 5, CreateTime = 1599026250257, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-52, 91, -50, -111, -85, 125, 73, 66, -117, -6, 22, -66, 9, -58, 78, -104])], isReadOnly = false), key = null, value = pong at 2020-09-02T13:57:30.162439500) with correlationId: [-68643167049153640897138814702361096552], perhaps timed out, or using a shared reply topic
The consumer record value is the input message, not the value of the return from server side(in the error info).
Update: fixed it according to Gary's tips.
@Bean
public ConcurrentMessageListenerContainer<String, String> listenerContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> listenerContainer =
containerFactory.createContainer("replies");//Use a different topic name.
listenerContainer.getContainerProperties().setGroupId("repliesGroup");
listenerContainer.setAutoStartup(false);
return listenerContainer;
}
pingpong
- you can't use the same topic for requests and replies; if you do, the sender will get a copy of the request as well as the reply. Hence that log message.