Search code examples
apache-kafkaspring-integrationspring-kafkaspring-cloud-stream

Spring boot Kafka request-reply scenario


I am implementing POC of request/reply scenario in order to move event-based microservice stack with using Kafka.

There is 2 options in spring. I wonder which one is better to use. ReplyingKafkaTemplate or cloud-stream

First is ReplyingKafkaTemplate which can be easily configured to have dedicated channel to reply topics for each instance. record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, provider.getReplyChannelName().getBytes())); Consumer should not need to know replying topic name, just listens a topic and returns with given reply topic.

@KafkaListener(topics = "${kafka.topic.concat-request}")
@SendTo
public ConcatReply listen(ConcatModel request) {
       .....
}

Second option is using combination of StreamListener, spring-integration and IntegrationFlows. Gateway should be configured and reply topics should be filtered.

@MessagingGateway
public interface StreamGateway {
    @Gateway(requestChannel = START, replyChannel = FILTER, replyTimeout = 5000, requestTimeout = 2000)
    String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() { 
    return IntegrationFlows.from(START)
            .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
            .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(Channels.INSTANCE_ID ,instanceUUID)) 
            .channel(Channels.REQUEST)
            .get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
    return IntegrationFlows.from(GatewayChannels.REPLY)
            .filter(Message.class, message -> Channels.INSTANCE_ID.equals(message.getHeaders().get("instanceId")) )
            .channel(FILTER)
            .get();
}

Building reply

@StreamListener(Channels.REQUEST)
@SendTo(Channels.REPLY)
public Message<?> process(Message<String> request) { 

Specifying reply channel is mandatory. So receieved reply topics are filtered according to instanceID which a kind of workaround (might bloat up the network). On the other hand, DLQ scenario is enabled with adding

              consumer:
                enableDlq: true

Using spring cloud streams looks promising in terms of interoperability with RabbitMQ and other features, but not officially supports request reply scenario right away. Issue is still open, not rejected also. (https://github.com/spring-cloud/spring-cloud-stream/issues/1800)

Any suggestions are welcomed.


Solution

  • Spring Cloud Stream is not designed for request/reply; it can be done, it is not straightforward and you have to write code.

    With @KafkaListener the framework takes care of everything for you.

    If you want it to work with RabbitMQ too, you can annotate it with @RabbitListener as well.