Search code examples
springspring-bootapache-kafkaspring-kafka

ReplyingKafkaTemplate: No pending reply in a request/reply Spring Boot applicaiton


I followed the Spring Kafka doc to create a request/reply pattern application.

The client using a reply template to send and receive the message.

@SpringBootApplication
@Slf4j
public class PingApplication {

    public static final String TOPIC_PINGPONG = "pingpong";


    public static void main(String[] args) {
        SpringApplication.run(PingApplication.class, args);
    }

    @Bean
    ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
            ProducerFactory<String, String> producerFactory,
            GenericMessageListenerContainer<String, String> listenerContainer
    ) {
        var template= new ReplyingKafkaTemplate<String, String, String>(producerFactory, listenerContainer);
//        template.setSharedReplyTopic(false);
//        template.setDefaultReplyTimeout(Duration.ofSeconds(5));
        return template;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> listenerContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> listenerContainer =
                containerFactory.createContainer("pingpong");
        listenerContainer.getContainerProperties().setGroupId("pingpongGroup");
        listenerContainer.setAutoStartup(false);
        return listenerContainer;
    }

    @Bean
    @SneakyThrows
    RouterFunction<ServerResponse> router(ReplyingKafkaTemplate<String, String, String> template) {
        return route()
                .GET("/",
                        req -> {
                            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_PINGPONG, "ping");
                            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
                            replyFuture.addCallback(
                                    result -> {
                                        log.info("callback result: {}", result);
                                    },
                                    ex -> {
                                        log.info("callback ex: {}", ex.getMessage());
                                    }
                            );
                            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
                            log.info("Sent ok: {}", sendResult.toString());
                            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
                            log.info("Return value: {}->{}", consumerRecord.key(), consumerRecord.value());

                            return ok().body(consumerRecord.value());
                        }
                )
                .build();
    }

}

The server-side application looks like:

@SpringBootApplication
public class PongApplication {
    public static final String TOPIC_PINGPONG = "pingpong";


    public static void main(String[] args) {
        new SpringApplicationBuilder(PongApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }
}

@Component
@Slf4j
@RequiredArgsConstructor
class PongHandler {
    @KafkaListener(groupId = "server", containerGroup = "pingpongGroup", topics = PongApplication.TOPIC_PINGPONG)
    @SendTo // use default replyTo expression
    public String handle(String request) {
        log.info("Received: {} in {}", request, this.getClass().getName());
        return "pong at " + LocalDateTime.now();
    }
}

When running the client and server-side application respectively and hit the http://localhost:8080 to send a message.

In the console of client application.

2020-09-02 13:57:30.116  INFO 17544 --- [nio-8080-exec-1] com.example.demo.ping.PingApplication    : Return value: null->ping
2020-09-02 13:57:30.292 ERROR 17544 --- [Container-0-C-1] o.s.k.r.ReplyingKafkaTemplate            : No pending reply: ConsumerRecord(topic = pingpong, partition = 5, leaderEpoch = 0, offset = 5, CreateTime = 1599026250257, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-52, 91, -50, -111, -85, 125, 73, 66, -117, -6, 22, -66, 9, -58, 78, -104])], isReadOnly = false), key = null, value = pong at 2020-09-02T13:57:30.162439500) with correlationId: [-68643167049153640897138814702361096552], perhaps timed out, or using a shared reply topic

The consumer record value is the input message, not the value of the return from server side(in the error info).

Update: fixed it according to Gary's tips.

    @Bean
    public ConcurrentMessageListenerContainer<String, String> listenerContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> listenerContainer =
                containerFactory.createContainer("replies");//Use a different topic name.
        listenerContainer.getContainerProperties().setGroupId("repliesGroup");
        listenerContainer.setAutoStartup(false);
        return listenerContainer;
    }

Solution

  • pingpong - you can't use the same topic for requests and replies; if you do, the sender will get a copy of the request as well as the reply. Hence that log message.