Search code examples
spring-cloud-stream

MessagingGateway request/response best approach


I'm trying to have a webflux boot app make calls to a separate SCS boot app. The most applicable previous question I found was this one. Is there a way to create a Request/Response MessagingGateway that doesn't use the (@Input,@Output,@EnableBinding) deprecated annotations?

I want to use the newer functional style in both apps if possible. So far this maven project is my best/only working result. I've tried all kinds of techniques including making the gateway return WebFlux message types directly. If needed, I can dig those up but I figured it might be easier to push what I have that works rather than clutter this question with my graveyard.

Thanks, Glenn


Solution

  • Here is a sample how to configure a request-reply gateway with Spring Cloud Stream:

    @SpringBootApplication
    public class SpringCloudStreamRequestReplyApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringCloudStreamRequestReplyApplication.class, args);
        }
    
        @Bean
        IntegrationFlow requestFlow(StreamBridge streamBridge) {
            return IntegrationFlows.from(UpperCaseGateway.class)
                    .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                    .handle(m -> streamBridge.send("requests", m))
                    .get();
        }
    
        @Bean
        IntegrationFlow repliesFlow(HeaderChannelRegistry channelRegistry) {
            return IntegrationFlows.from(MessageConsumer.class, gateway -> gateway.beanName("replies"))
                    .filter(Message.class,
                            m -> channelRegistry.channelNameToChannel(m.getHeaders().getReplyChannel().toString()) != null,
                            filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true))
                    .get();
        }
    
        public interface UpperCaseGateway {
    
            String toUpperCase(String payload);
    
        }
    
        public interface MessageConsumer extends Consumer<Message<String>> {
    
        }
    
    }
    

    So, in this case we call UpperCaseGateway.toUpperCase() API which is going to send a request over Spring Cloud Stream binder into a requests destination.

    The replies gateway for a one-way MessageConsumer represents a Spring Cloud Stream functional binding with this property:

    spring.cloud.stream.bindings.replies-in-0.destination=replies
    

    At this point we correlated replies with their requests and use a built-in replying feature from Spring Integration.

    The other side (I call it "server") has to deal with a Message to carry on required correlation headers and musts receive requests from the requests destination and reply to the replies destination.