I'm trying to have a webflux boot app make calls to a separate SCS boot app. The most applicable previous question I found was this one. Is there a way to create a Request/Response MessagingGateway that doesn't use the (@Input,@Output,@EnableBinding) deprecated annotations?
I want to use the newer functional style in both apps if possible. So far this maven project is my best/only working result. I've tried all kinds of techniques including making the gateway return WebFlux message types directly. If needed, I can dig those up but I figured it might be easier to push what I have that works rather than clutter this question with my graveyard.
Thanks, Glenn
Here is a sample how to configure a request-reply gateway with Spring Cloud Stream:
@SpringBootApplication
public class SpringCloudStreamRequestReplyApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamRequestReplyApplication.class, args);
}
@Bean
IntegrationFlow requestFlow(StreamBridge streamBridge) {
return IntegrationFlows.from(UpperCaseGateway.class)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.handle(m -> streamBridge.send("requests", m))
.get();
}
@Bean
IntegrationFlow repliesFlow(HeaderChannelRegistry channelRegistry) {
return IntegrationFlows.from(MessageConsumer.class, gateway -> gateway.beanName("replies"))
.filter(Message.class,
m -> channelRegistry.channelNameToChannel(m.getHeaders().getReplyChannel().toString()) != null,
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true))
.get();
}
public interface UpperCaseGateway {
String toUpperCase(String payload);
}
public interface MessageConsumer extends Consumer<Message<String>> {
}
}
So, in this case we call UpperCaseGateway.toUpperCase()
API which is going to send a request over Spring Cloud Stream binder into a requests
destination.
The replies
gateway for a one-way MessageConsumer
represents a Spring Cloud Stream functional binding with this property:
spring.cloud.stream.bindings.replies-in-0.destination=replies
At this point we correlated replies with their requests and use a built-in replying feature from Spring Integration.
The other side (I call it "server") has to deal with a Message
to carry on required correlation headers and musts receive requests from the requests
destination and reply to the replies
destination.